## Bank Marketing Dataset
Contiene datos de una campaña de marketing directo de una institución bancaria portuguesa. El objetivo principal de esta campaña era convencer a los clientes de que suscribieran un depósito a plazo fijo. La tarea es predecir si un cliente se suscribirá a un depósito a plazo fijo. Los atributos son:

<ul>
<li>age: Edad del cliente (numérico).</li>
<li>job: Tipo de trabajo (categórico).</li>
<li>marital: Estado civil (categórico).</li>
<li>education: Nivel de educación (categórico).</li>
<li>default: ¿Tiene crédito en default?.</li>
<li>balance: Balance promedio anual en la cuenta bancaria (numérico, en euros).</li>
<li>housing: ¿Tiene crédito de vivienda?.</li>
<li>loan: ¿Tiene préstamo personal?.</li>
<li>contact: Tipo de comunicación de contacto (categórico).</li>
<li>day: Día del mes en que se realizó el último contacto del cliente (numérico).</li>
<li>month: Mes en que se realizó el último contacto del cliente.</li>
<li>duration: Duración del último contacto en segundos (numérico).</li>
<li>campaign: Número de contactos realizados durante esta campaña para este cliente (numérico).</li>
<li>pdays: Número de días que pasaron después de que el cliente fue contactado por última vez desde una campaña anterior (numérico).</li>
<li>previous: Número de contactos realizados antes de esta campaña para este cliente (numérico).</li>
<li>poutcome: Resultado de la campaña de marketing anterior.</li>
<li>y: Variable objetivo (target) que indica si el cliente ha suscrito un depósito a plazo fijo (binario: "yes", "no").</li>
</ul>

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,desc
from google.colab import drive
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from os import truncate
from pyspark.sql.functions import col
from pyspark.sql.functions import round
from pyspark.sql.functions import expr
from pyspark.sql.functions import mean
from pyspark.sql.functions import when
from pyspark.sql.functions import avg
from pyspark.sql.functions import floor, count
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
drive.mount('/content/drive')
path = '/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/bank.csv'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
'''SparkContext: Se utiliza para crear RDDs (Resilient Distributed Datasets), que son la estructura de datos fundamental en Spark. SparkContext también administra la comunicación con el cluster Spark, coordinando la ejecución de operaciones en el cluster y administrando la memoria.
SparkSession: Es una interfaz unificada de nivel superior que reemplaza a las antiguas clases SQLContext y HiveContext en versiones anteriores de Spark. SparkSession proporciona una forma conveniente de trabajar con Apache Spark y los datos estructurados, como DataFrames y Datasets, así como con SQL. Además de encapsular SparkContext, SparkSession proporciona funcionalidades adicionales
En resumen, SparkContext es la interfaz principal para interactuar con Spark en un nivel más bajo, mientras que SparkSession es una capa más alta que proporciona una interfaz más fácil de usar para trabajar con datos estructurados y realizar operaciones SQL
'''

'SparkContext: Se utiliza para crear RDDs (Resilient Distributed Datasets), que son la estructura de datos fundamental en Spark. SparkContext también administra la comunicación con el cluster Spark, coordinando la ejecución de operaciones en el cluster y administrando la memoria.\nSparkSession: Es una interfaz unificada de nivel superior que reemplaza a las antiguas clases SQLContext y HiveContext en versiones anteriores de Spark. SparkSession proporciona una forma conveniente de trabajar con Apache Spark y los datos estructurados, como DataFrames y Datasets, así como con SQL. Además de encapsular SparkContext, SparkSession proporciona funcionalidades adicionales\nEn resumen, SparkContext es la interfaz principal para interactuar con Spark en un nivel más bajo, mientras que SparkSession es una capa más alta que proporciona una interfaz más fácil de usar para trabajar con datos estructurados y realizar operaciones SQL\n'

In [5]:
spark = SparkSession.builder \
    .appName("Bank Analysis") \
    .getOrCreate()

In [6]:
# Leer conjunto de datos con header y separador
df_pyspark=spark.read.option('header','true').csv(path,inferSchema=True, sep=";")
# Ver esquema
df_pyspark.printSchema()
# Mostrar datos
df_pyspark.show()


root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+--

In [7]:

# Definir el esquema
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("job", StringType(), True),
    StructField("marital", StringType(), True),
    StructField("education", StringType(), True),
    StructField("default", StringType(), True),
    StructField("balance", IntegerType(), True),
    StructField("housing", StringType(), True),
    StructField("loan", StringType(), True),
    StructField("contact", StringType(), True),
    StructField("day", IntegerType(), True),
    StructField("month", StringType(), True),
    StructField("duration", IntegerType(), True),
    StructField("campaign", IntegerType(), True),
    StructField("pdays", IntegerType(), True),
    StructField("previous", IntegerType(), True),
    StructField("poutcome", StringType(), True),
    StructField("y", StringType(), True)  # Variable objetivo
])

# Cargar datos
df = spark.read.csv(path, header=True, schema=schema, sep=";")
# Ver esquema
df.printSchema()
# Mostrar datos
df.show()


root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+--

In [8]:
# Leer datos
bank_df = spark.read.csv(path, header=True, inferSchema=True , sep=";")
# Ver esquema
bank_df.printSchema()
# Mostrar datos
bank_df.show()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+--

In [9]:
# Tipo de una variable
type(bank_df)

In [10]:
# Primeros y últimos elementos
bank_df.head(3)
bank_df.tail(3)


[Row(age=58, job='management', marital='married', education='secondary', default='no', balance=37, housing='no', loan='no', contact='cellular', day=18, month='aug', duration=84, campaign=11, pdays=-1, previous=0, poutcome='unknown', y='no'),
 Row(age=34, job='management', marital='single', education='tertiary', default='no', balance=673, housing='yes', loan='yes', contact='cellular', day=20, month='nov', duration=271, campaign=1, pdays=184, previous=1, poutcome='failure', y='no'),
 Row(age=40, job='services', marital='married', education='secondary', default='no', balance=-342, housing='yes', loan='no', contact='unknown', day=26, month='may', duration=754, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no')]

In [11]:
# Imprimir el tipo de cada columna
for column_name, column_type in bank_df.dtypes:
    print(f"Column: {column_name}, Type: {column_type}")

Column: age, Type: int
Column: job, Type: string
Column: marital, Type: string
Column: education, Type: string
Column: default, Type: string
Column: balance, Type: int
Column: housing, Type: string
Column: loan, Type: string
Column: contact, Type: string
Column: day, Type: int
Column: month, Type: string
Column: duration, Type: int
Column: campaign, Type: int
Column: pdays, Type: int
Column: previous, Type: int
Column: poutcome, Type: string
Column: y, Type: string


In [12]:
# Ver tipos de las columnas
bank_df.dtypes


[('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('y', 'string')]

In [13]:
# Estadísticas de variables
print("Análisis algunas variables:")
bank_df.select("age", "balance", "duration").describe().show()

Análisis algunas variables:
+-------+------------------+------------------+------------------+
|summary|               age|           balance|          duration|
+-------+------------------+------------------+------------------+
|  count|               999|               999|               999|
|   mean|  41.1951951951952|1499.8008008008007|263.87187187187186|
| stddev|10.446875674557257|2779.5450521178586|273.89968542323953|
|    min|                19|             -1680|                 5|
|    max|                83|             26965|              3025|
+-------+------------------+------------------+------------------+



In [14]:
# Describe controlando resultados
from pyspark.sql.functions import format_number
result = bank_df.describe()
result.select(result['summary'],
              format_number(result['age'].cast('float'),2).alias('age'),
              format_number(result['balance'].cast('float'),2).alias('balance'),
              format_number(result['duration'].cast('float'),2).alias('duration')
             ).show()


+-------+------+---------+--------+
|summary|   age|  balance|duration|
+-------+------+---------+--------+
|  count|999.00|   999.00|  999.00|
|   mean| 41.20| 1,499.80|  263.87|
| stddev| 10.45| 2,779.55|  273.90|
|    min| 19.00|-1,680.00|    5.00|
|    max| 83.00|26,965.00|3,025.00|
+-------+------+---------+--------+



In [15]:
# Crear columna
df2 = bank_df.withColumn("Age balance ratio",bank_df["balance"]/bank_df["age"])#.show()
df2.select('Age balance ratio').show()

+------------------+
| Age balance ratio|
+------------------+
| 59.56666666666667|
|145.12121212121212|
| 38.57142857142857|
|              49.2|
|               0.0|
| 21.34285714285714|
| 8.527777777777779|
| 3.769230769230769|
| 5.390243902439025|
|-2.046511627906977|
|240.35897435897436|
|6.1395348837209305|
|30.805555555555557|
|              25.1|
|11.612903225806452|
|              4.85|
| 72.73214285714286|
| 62.62162162162162|
|             -8.84|
| 4.258064516129032|
+------------------+
only showing top 20 rows



In [16]:
# Obtener máximo
bank_df.orderBy(bank_df["age"].desc()).head(1)[0][0]

83

In [17]:
# Máximo y mínimo
from pyspark.sql.functions import max,min
bank_df.select(max("age"),min("age")).show()

+--------+--------+
|max(age)|min(age)|
+--------+--------+
|      83|      19|
+--------+--------+



In [18]:

# Correlacción
from pyspark.sql.functions import corr
bank_df.select(corr("age","balance")).show()

+-------------------+
| corr(age, balance)|
+-------------------+
|0.07873130693166437|
+-------------------+



In [19]:
# Filtrar por dos condiciones
bank_df.filter( (bank_df["age"] > 30) & (bank_df['balance'] > 200) ).show()

+---+-------------+--------+---------+-------+-------+-------+----+---------+----+-----+--------+--------+-----+--------+--------+---+
|age|          job| marital|education|default|balance|housing|loan|  contact| day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+--------+---------+-------+-------+-------+----+---------+----+-----+--------+--------+-----+--------+--------+---+
| 33|     services| married|secondary|     no|   4789|    yes| yes| cellular|  11|  may|     220|       1|  339|       4| failure| no|
| 35|   management|  single| tertiary|     no|   1350|    yes|  no| cellular|  16|  apr|     185|       1|  330|       1| failure| no|
| 35|   management|  single| tertiary|     no|    747|     no|  no| cellular|  23|  feb|     141|       2|  176|       3| failure| no|
| 36|self-employed| married| tertiary|     no|    307|    yes|  no| cellular|NULL|  may|     341|       1|  330|       2|   other| no|
| 41| entrepreneur| married| tertiary|     no|    221| 

In [20]:
# Filtrar con or
bank_df.filter( (bank_df["age"] > 30) | (bank_df['balance'] > 200) ).show()

+---+-------------+--------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
|age|          job| marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+--------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed| married|  primary|     no|   1787|     no|  no|cellular|  19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services| married|secondary|     no|   4789|    yes| yes|cellular|  11|  may|     220|       1|  339|       4| failure| no|
| 35|   management|  single| tertiary|     no|   1350|    yes|  no|cellular|  16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management| married| tertiary|     no|   1476|    yes| yes| unknown|   3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar| married|secondary|     no|      0|    yes|

In [21]:
# Filtrar con opuesto
bank_df.filter( (bank_df["age"] >20) & ~(bank_df['balance'] > 200) ).show()

+---+------------+--------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+---+
| 59| blue-collar| married|secondary|     no|      0|    yes|  no|  unknown|  5|  may|     226|       1|   -1|       0| unknown| no|
| 39|  technician| married|secondary|     no|    147|    yes|  no| cellular|  6|  may|     151|       2|   -1|       0| unknown| no|
| 43|    services| married|  primary|     no|    -88|    yes| yes| cellular| 17|  apr|     313|       1|  147|       2| failure| no|
| 40|  management| married| tertiary|     no|    194|     no| yes| cellular| 29|  aug|     189|       2|   -1|       0| unknown| no|
| 25| blue-collar|  single|  primary|     no|   -221|    yes|  no|  u

In [22]:
# Filtrar por valor concreto
bank_df.filter(bank_df["age"] == 35).show()

+---+-------------+--------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job| marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+--------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 35|   management|  single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 35|   management|  single| tertiary|     no|    747|     no|  no|cellular| 23|  feb|     141|       2|  176|       3| failure| no|
| 35|  blue-collar|  single|  primary|     no|    293|    yes|  no| unknown| 30|  may|     521|       2|   -1|       0| unknown| no|
| 35|   management| married| tertiary|     no|    106|     no| yes|cellular| 11|  aug|     588|       2|   -1|       0| unknown| no|
| 35|   technician|divorced|secondary|     no|    308|     no|  no|ce

In [23]:
# Cargar con fechas
df_dates = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/appl_stock.csv",header=True,inferSchema=True)

In [24]:
from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format
# Dia del mes
df_dates.select(dayofmonth(df_dates['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [25]:
# Hora
df_dates.select(hour(df_dates['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [26]:
# Año
df_dates.select(year(df_dates['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [27]:
# Extraer el año, agrupar y calcular media
newdf = df_dates.withColumn("Year",year(df_dates['Date']))
result = newdf.groupBy("Year").mean()[['avg(Year)','avg(Close)']]
result = result.withColumnRenamed("avg(Year)","Year")
result = result.select('Year',format_number('avg(Close)',2).alias("Mean Close")).show()

+------+----------+
|  Year|Mean Close|
+------+----------+
|2015.0|    120.04|
|2013.0|    472.63|
|2014.0|    295.40|
|2012.0|    576.05|
|2016.0|    104.60|
|2010.0|    259.84|
|2011.0|    364.00|
+------+----------+



In [28]:
# Agrupa por age y calcula el min del resto
bank_df.groupBy("age").min().show()

+---+--------+------------+--------+-------------+-------------+----------+-------------+
|age|min(age)|min(balance)|min(day)|min(duration)|min(campaign)|min(pdays)|min(previous)|
+---+--------+------------+--------+-------------+-------------+----------+-------------+
| 31|      31|        -253|       2|           43|            1|        -1|            0|
| 65|      65|        1840|       1|          383|            2|       188|            5|
| 53|      53|         -22|       4|           20|            1|        -1|            0|
| 78|      78|         229|      16|           80|            1|        -1|            0|
| 34|      34|        -370|       2|            9|            1|        -1|            0|
| 81|      81|           1|      19|           65|            5|        -1|            0|
| 28|      28|        -298|       1|           22|            1|        -1|            0|
| 26|      26|         -32|       5|           14|            1|        -1|            0|
| 27|     

In [29]:
# Contar distintos
from pyspark.sql.functions import countDistinct, stddev
bank_df.select(countDistinct("age")).show()

+-------------------+
|count(DISTINCT age)|
+-------------------+
|                 57|
+-------------------+



In [30]:
# Desviación con control decimales
age_std = bank_df.select(stddev("age").alias('std'))
age_std.select(format_number('std',2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|                10.45|
+---------------------+



In [31]:
# Seleccionar columnas, sin show para guardarlo en otro dataframe
bank_df.select(['age','job']).show()

# Seleccionar columnas
finalized_data=bank_df.select('age','job')

# Seleccionar columna
bank_df.select(bank_df.age).show(truncate=False)

# Otra forma
bank_df.select(bank_df['age']).show(truncate=False)

# Otra forma
bank_df.select(col('age')).show(truncate=False)

# Seleccionar múltiples columnas
bank_df.select(bank_df.age, bank_df.job).show(truncate=False)

# Otra forma
bank_df.select(bank_df['age'], bank_df['job']).show(truncate=False)

# Otra forma
bank_df.select(col('age'), col('job')).show(truncate=False)

# Seleccionar columnas
selected_columns = bank_df.select("age", "job", "education")
print("Columnas seleccionadas:")
selected_columns.show(10)


+---+-------------+
|age|          job|
+---+-------------+
| 30|   unemployed|
| 33|     services|
| 35|   management|
| 30|   management|
| 59|  blue-collar|
| 35|   management|
| 36|self-employed|
| 39|   technician|
| 41| entrepreneur|
| 43|     services|
| 39|     services|
| 43|       admin.|
| 36|   technician|
| 20|      student|
| 31|  blue-collar|
| 40|   management|
| 56|   technician|
| 37|       admin.|
| 25|  blue-collar|
| 31|     services|
+---+-------------+
only showing top 20 rows

+---+
|age|
+---+
|30 |
|33 |
|35 |
|30 |
|59 |
|35 |
|36 |
|39 |
|41 |
|43 |
|39 |
|43 |
|36 |
|20 |
|31 |
|40 |
|56 |
|37 |
|25 |
|31 |
+---+
only showing top 20 rows

+---+
|age|
+---+
|30 |
|33 |
|35 |
|30 |
|59 |
|35 |
|36 |
|39 |
|41 |
|43 |
|39 |
|43 |
|36 |
|20 |
|31 |
|40 |
|56 |
|37 |
|25 |
|31 |
+---+
only showing top 20 rows

+---+
|age|
+---+
|30 |
|33 |
|35 |
|30 |
|59 |
|35 |
|36 |
|39 |
|41 |
|43 |
|39 |
|43 |
|36 |
|20 |
|31 |
|40 |
|56 |
|37 |
|25 |
|31 |
+---+
only showi

In [32]:
# Añadir columna en base a otra columna
bank_df=bank_df.withColumn('Balance plus 100',bank_df['balance']+100)

# Añadir columna calculada
bank_df = bank_df.withColumn("Balance multiplied", bank_df.balance * bank_df.duration)
print("DataFrame with New Column:")
bank_df.show(10)


DataFrame with New Column:
+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+----------------+------------------+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|Balance plus 100|Balance multiplied|
+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+----------------+------------------+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular|  19|  oct|      79|       1|   -1|       0| unknown| no|            1887|            141173|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular|  11|  may|     220|       1|  339|       4| failure| no|            4889|           1053580|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular|  16|  apr|     185|       1|  330|       1| failure|

In [33]:
# Renombrar columna
bank_df.withColumnRenamed('Balance plus 100','Balance 100').show()
bank_df.show(10)


+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+-----------+------------------+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|Balance 100|Balance multiplied|
+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+-----------+------------------+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular|  19|  oct|      79|       1|   -1|       0| unknown| no|       1887|            141173|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular|  11|  may|     220|       1|  339|       4| failure| no|       4889|           1053580|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular|  16|  apr|     185|       1|  330|       1| failure| no|       1450|            249750|
| 30|   manageme

In [34]:
# Eliminar columna
bank_df=bank_df.drop("Balance 100", "Balance multiplied", "Balance plus 100")
bank_df.show(10)


+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular|  19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular|  11|  may|     220|       1|  339|       4| failure| no|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular|  16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|   3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar|married|secondary|     no|      0|    yes|  no| un

In [35]:
# Personas con age<30 si se hace sin show se puede guardar en otro dataframe
bank_df.filter("age<30").show()

# Solo mostrando columnas necesarias
bank_df.filter("age<30").select(['job','age']).show()

# Otra forma
bank_df.filter(bank_df['age']<30).show()

# or con | y and con &
bank_df.filter((bank_df['age']<30) &
                  (bank_df['age']>35)).show()

# Ppuesto a menores de 30
bank_df.filter(~(bank_df['age']<=30)).show()

# Filtrar segun condición
filtered_data = bank_df.filter(bank_df.age <30)
print("Datos filtrados:", filtered_data.count())
filtered_data.show()


+---+-------------+--------+---------+-------+-------+-------+----+---------+----+-----+--------+--------+-----+--------+--------+---+
|age|          job| marital|education|default|balance|housing|loan|  contact| day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+--------+---------+-------+-------+-------+----+---------+----+-----+--------+--------+-----+--------+--------+---+
| 20|      student|  single|secondary|     no|    502|     no|  no| cellular|  30|  apr|     261|       1|   -1|       0| unknown|yes|
| 25|  blue-collar|  single|  primary|     no|   -221|    yes|  no|  unknown|  23|  may|     250|       1|   -1|       0|    NULL| no|
| 26|    housemaid| married| tertiary|     no|    543|     no|  no| cellular|  30|  jan|     169|       3|   -1|       0|    NULL| no|
| 23|     services|  single| tertiary|     no|    363|    yes|  no|  unknown|  30|  may|      16|      18|   -1|       0| unknown| no|
| 26|  blue-collar| married|  primary|     no|      0| 

In [36]:
# Convertir la columna 'pdays' de int a string
bank_df = bank_df.withColumn("pdays_string", bank_df["pdays"].cast(StringType()))
# Mostrar el esquema y los datos después de la conversión
bank_df.printSchema()
bank_df.show()

# Convertir la columna 'pdays_string' de string a int
bank_df = bank_df.withColumn("pdays_int", bank_df["pdays_string"].cast(IntegerType()))

# Mostrar el esquema y los datos después de la conversión
bank_df.printSchema()
bank_df.show()


root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)
 |-- pdays_string: string (nullable = true)

+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+------------+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|pdays_string|
+---+--

In [37]:
# Eliminar las columnas
bank_df=bank_df.drop("pdays_string", "pdays_int")
bank_df.show(10)

+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular|  19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular|  11|  may|     220|       1|  339|       4| failure| no|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular|  16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|   3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar|married|secondary|     no|      0|    yes|  no| un

In [38]:
# Agrupación
print("Número de clientes por job:")
bank_df.groupBy("job").count().show()

Número de clientes por job:
+-------------+-----+
|          job|count|
+-------------+-----+
|   management|  221|
|      retired|   52|
|      unknown|    5|
|self-employed|   53|
|      student|   20|
|  blue-collar|  206|
| entrepreneur|   40|
|       admin.|   96|
|   technician|  181|
|     services|   76|
|    housemaid|   24|
|   unemployed|   25|
+-------------+-----+



In [39]:
# Agrupa por housing y calcula media de age
print("Media de age agruando por housing:")
bank_df.groupBy("housing").agg({"age": "mean"}).show()

Media de age agruando por housing:
+-------+-----------------+
|housing|         avg(age)|
+-------+-----------------+
|     no|43.29438202247191|
|    yes|39.50902527075812|
+-------+-----------------+



In [40]:
# Media de balance
print("Balance medio de los clientes:")
bank_df.agg({"balance": "mean"}).show()

Balance medio de los clientes:
+------------------+
|      avg(balance)|
+------------------+
|1499.8008008008007|
+------------------+



In [41]:
# Contar según la clase con filtro
print("Número de clientes suscritos:")
bank_df.filter(col("y") == "yes").count()

Número de clientes suscritos:


114

In [42]:
# Realizar la agrupación y el conteo
grouped_df = bank_df.groupBy('job', 'education').count()

# Convertir el DataFrame de PySpark a un DataFrame de Pandas
pandas_df = grouped_df.toPandas()

# Guardar el DataFrame de Pandas en un archivo csv
pandas_df.to_csv("/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/resultado_agrupado.xlsx", index=False)


In [43]:
# Cuenta filas
num_fils = bank_df.count()
print("Total de registros:", num_fils)

# Columnas
num_cols = len(bank_df.columns)

# Imprimir con formato
print("El DataFrame tiene {} filas y {} columnas.".format(num_fils, num_cols))


Total de registros: 999
El DataFrame tiene 999 filas y 17 columnas.


In [44]:
# Contar con filtro
default_no= bank_df.filter(col('default')=="no").count()
print("Total de clientes con default no:", default_no)

default_no_housing= bank_df.filter((col('default')=="no")&(col('housing')=="yes")).count()
print("Total de clientes con default no y housing yes:", default_no_housing)


Total de clientes con default no: 978
Total de clientes con default no y housing yes: 541


In [45]:
# Ordenar resultados
# Ascending se puede quitar
bank_df.orderBy('age',ascending=False).show(truncate=False)

# Ordenar pr columna
sorted_data = bank_df.orderBy("balance")
print("Sorted Data:")
sorted_data.show(10)

# Orden descendente
sorted_data = bank_df.orderBy(col("balance").desc(), col("age").desc())
print("Sorted Data de forma descendente:")
sorted_data.show(10)


+---+-----------+--------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+---+
|age|job        |marital |education|default|balance|housing|loan|contact  |day|month|duration|campaign|pdays|previous|poutcome|y  |
+---+-----------+--------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+---+
|83 |retired    |married |secondary|no     |0      |no     |no  |cellular |18 |mar  |140     |10      |-1   |0       |unknown |no |
|81 |retired    |married |secondary|no     |1      |no     |no  |cellular |19 |aug  |65      |5       |-1   |0       |unknown |no |
|80 |management |married |primary  |no     |6483   |no     |no  |telephone|19 |oct  |123     |1       |-1   |0       |unknown |no |
|78 |retired    |divorced|primary  |no     |229    |no     |no  |telephone|22 |oct  |97      |1       |-1   |0       |unknown |yes|
|78 |housemaid  |married |secondary|no     |499    |no     |no  |telephone|1

In [46]:
# Seleccionar elementos distintos
distinct_rows = bank_df.select("education").distinct()
print("Diferentes valores en education:")
distinct_rows.show()


Diferentes valores en education:
+---------+
|education|
+---------+
|  unknown|
| tertiary|
|secondary|
|  primary|
+---------+



In [47]:
# Ordenar group by resultado
bank_df.groupBy('education').count().orderBy('count', ascending=False).show()


+---------+-----+
|education|count|
+---------+-----+
|secondary|  491|
| tertiary|  313|
|  primary|  155|
|  unknown|   40|
+---------+-----+



In [48]:
# Agrupar y realizar cálculo
# Agrupo por education
bank_df.groupBy('education').sum("balance").show()

# Agrupo por education y calculo media
bank_df.groupBy('education').avg("balance").show()

# Agrupo por education y calculo media
bank_df.groupBy('education').mean("balance").show()

# Agrupo por education y cuento
bank_df.groupBy('education').count().show()

# Agrupar y aplicar diferentes funciones sobre cada columna
grouped_data = bank_df.groupBy("education").agg({"balance": "sum", "age": "avg"})
print("Agrupado por education:")
grouped_data.show()

# Agrupa por education y calcula media si se suscribe al depósito
print("Media de edad por education:")
bank_df.groupBy("education").agg({"age": "mean"}).show()


+---------+------------+
|education|sum(balance)|
+---------+------------+
|  unknown|       52421|
| tertiary|      579767|
|secondary|      605377|
|  primary|      260736|
+---------+------------+

+---------+------------------+
|education|      avg(balance)|
+---------+------------------+
|  unknown|          1310.525|
| tertiary|1852.2907348242811|
|secondary| 1232.947046843177|
|  primary| 1682.167741935484|
+---------+------------------+

+---------+------------------+
|education|      avg(balance)|
+---------+------------------+
|  unknown|          1310.525|
| tertiary|1852.2907348242811|
|secondary| 1232.947046843177|
|  primary| 1682.167741935484|
+---------+------------------+

+---------+-----+
|education|count|
+---------+-----+
|  unknown|   40|
| tertiary|  313|
|secondary|  491|
|  primary|  155|
+---------+-----+

Agrupado por education:
+---------+------------+------------------+
|education|sum(balance)|          avg(age)|
+---------+------------+------------------+


In [49]:
# Suma de todos los balance
bank_df.agg({'balance':'sum'}).show()


+------------+
|sum(balance)|
+------------+
|     1498301|
+------------+



In [50]:
# Matriz de correlación, con loan y housing
print("Distribución de loan con housing:")
loan_by_housing = bank_df.crosstab("loan", "housing")
loan_by_housing.show()

Distribución de loan con housing:
+------------+---+---+
|loan_housing| no|yes|
+------------+---+---+
|          no|372|463|
|         yes| 73| 91|
+------------+---+---+



In [51]:
# Similar pero con group by
print("Distribución de loan con housing:")
loan_by_housing = bank_df.groupBy("loan", "housing").count().orderBy("loan", "housing")
loan_by_housing.show()

Distribución de loan con housing:
+----+-------+-----+
|loan|housing|count|
+----+-------+-----+
|  no|     no|  372|
|  no|    yes|  463|
| yes|     no|   73|
| yes|    yes|   91|
+----+-------+-----+



In [52]:
# Group by de las tres variables
grouped_df = bank_df.groupBy("job", "marital", "education").count().orderBy("job")
grouped_df.show()

+-----------+--------+---------+-----+
|        job| marital|education|count|
+-----------+--------+---------+-----+
|     admin.|  single|  unknown|    1|
|     admin.|divorced| tertiary|    1|
|     admin.| married|  unknown|    3|
|     admin.| married|secondary|   43|
|     admin.|  single| tertiary|    4|
|     admin.|  single|secondary|   22|
|     admin.| married| tertiary|    7|
|     admin.| married|  primary|    3|
|     admin.|divorced|  unknown|    1|
|     admin.|divorced|  primary|    1|
|     admin.|divorced|secondary|   10|
|blue-collar| married|  primary|   58|
|blue-collar|  single|secondary|   27|
|blue-collar| married|  unknown|    2|
|blue-collar|  single|  unknown|    1|
|blue-collar|divorced|secondary|    7|
|blue-collar|divorced|  primary|   10|
|blue-collar| married| tertiary|    1|
|blue-collar|divorced|  unknown|    2|
|blue-collar|  single|  primary|   13|
+-----------+--------+---------+-----+
only showing top 20 rows



In [53]:
# Total de clientes
total_clientes = bank_df.count()
total_clientes

999

In [54]:
from pyspark.sql.functions import count, col
# Número de personas con préstamo personal y su porcentaje
loan_distribution = bank_df.groupBy("loan").agg(
    count("*").alias("count"),
    ((count("*") / total_clientes) * 100).alias("percentage")
)

In [55]:
# Mostrar los resultados
print("Número de clientes con préstamo personal y su porcentaje sobre el total de clientes:")
loan_distribution.show()

Número de clientes con préstamo personal y su porcentaje sobre el total de clientes:
+----+-----+------------------+
|loan|count|        percentage|
+----+-----+------------------+
|  no|  835| 83.58358358358359|
| yes|  164|16.416416416416414|
+----+-----+------------------+



In [56]:
# Porcentaje combinación atributos
y_by_loan_housing_percentage = bank_df.groupBy("loan", "housing", "y").agg(
    (count("*") / total_clientes * 100).alias("percentage")
)

# Redondear
y_by_loan_housing_percentage = y_by_loan_housing_percentage.withColumn("percentage", round("percentage", 2))

# Ordenar
y_by_loan_housing_percentage = y_by_loan_housing_percentage.orderBy("loan", "housing", "y")

# Tabla
print("Porcentaje de clientes para cada combinación de loan, housing y si se ha suscrito al depósito:")
y_by_loan_housing_percentage.show()

Porcentaje de clientes para cada combinación de loan, housing y si se ha suscrito al depósito:
+----+-------+---+----------+
|loan|housing|  y|percentage|
+----+-------+---+----------+
|  no|     no| no|     31.03|
|  no|     no|yes|      6.21|
|  no|    yes| no|     42.14|
|  no|    yes|yes|       4.2|
| yes|     no| no|      6.71|
| yes|     no|yes|       0.6|
| yes|    yes| no|      8.71|
| yes|    yes|yes|       0.4|
+----+-------+---+----------+



In [57]:

# Definir rangos de edades
age_ranges = [(i, i + 4) for i in range(0, 100, 5)]

In [58]:
# Columna con rango de edad
bank_df = bank_df.withColumn("AgeRange", expr(
    "CASE WHEN Age IS NULL THEN 'Unknown' " +
    "ELSE CAST(FLOOR(Age / 5) * 5 AS INT) + 1 || '-' || CAST(FLOOR(Age / 5) * 5 + 5 AS STRING) END"
))

bank_df.show()

+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+--------+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|AgeRange|
+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+--------+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular|  19|  oct|      79|       1|   -1|       0| unknown| no|   31-35|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular|  11|  may|     220|       1|  339|       4| failure| no|   31-35|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular|  16|  apr|     185|       1|  330|       1| failure| no|   36-40|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|   3|  jun|     199|       4|   -1|       0| unknown| no|   31-35|
| 59| 

In [59]:
# Agrupar y ordenar
age_distribution = bank_df.groupBy("AgeRange").count().orderBy("AgeRange")

# Mostrar
print("Tabla de frecuencia de clientes por rangos de edades de 5 años:")
age_distribution.show()

Tabla de frecuencia de clientes por rangos de edades de 5 años:
+--------+-----+
|AgeRange|count|
+--------+-----+
|   16-20|    1|
|   21-25|   12|
|   26-30|   95|
|   31-35|  210|
|   36-40|  176|
|   41-45|  155|
|   46-50|  117|
|   51-55|   95|
|   56-60|  104|
|   61-65|   19|
|   66-70|    5|
|   71-75|    1|
|   76-80|    6|
|   81-85|    3|
+--------+-----+



In [60]:


# Agrupar por job y calcular media edad
age_by_job = bank_df.groupBy("job").agg(round(mean("Age"),2).alias("AverageAge")).na.drop()

# Trabajo con edad media más alta
job_with_highest_average_age = age_by_job.orderBy(age_by_job["AverageAge"].desc()).first()

# Resultado
print("El trabajo donde la edad media de los clientes es la más alta es:", job_with_highest_average_age["job"])
print("La edad media en ese trabajo es:", job_with_highest_average_age["AverageAge"])


El trabajo donde la edad media de los clientes es la más alta es: retired
La edad media en ese trabajo es: 60.83


In [61]:

# Evaluar si se hicieron contactos previos
bank_df_with_previous = bank_df.withColumn("HadPrevious", when(col("previous") > 0, 1).otherwise(0))

# Porcentaje clientes se suscribieron con contactos previos
yes_percentage_by_previous = bank_df_with_previous.groupBy("HadPrevious").agg(
    (round((count(when(col("y") == "yes", True)) / count("*") * 100), 2)).alias("YesPercentage")
)

# Resultado
print("Porcentaje de clientes que se suscribieron al depósito con contactos previos:")
yes_percentage_by_previous.show()


Porcentaje de clientes que se suscribieron al depósito con contactos previos:
+-----------+-------------+
|HadPrevious|YesPercentage|
+-----------+-------------+
|          1|        18.23|
|          0|         9.79|
+-----------+-------------+



In [62]:

# Edad media por loan y clase objetivo
average_age_by_loan_and_y = bank_df.groupBy("loan", "y") \
    .agg(round(avg("age"), 2).alias("AverageAge"))

average_age_by_loan_and_y = average_age_by_loan_and_y.orderBy(average_age_by_loan_and_y["AverageAge"].desc())

# resultado
print("Edad media por loan y si se suscribieron al depósito:")
average_age_by_loan_and_y.show()


Edad media por loan y si se suscribieron al depósito:
+----+---+----------+
|loan|  y|AverageAge|
+----+---+----------+
|  no|yes|     41.76|
| yes| no|     41.68|
|  no| no|      41.1|
| yes|yes|      34.7|
+----+---+----------+



In [63]:

# Otra forma de incorporar intervalos
bank_df_with_age_group = bank_df.withColumn("AgeGroup", floor(col("age") / 5) * 5)
bank_df_with_age_group.show()

+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+--------+--------+
|age|          job|marital|education|default|balance|housing|loan| contact| day|month|duration|campaign|pdays|previous|poutcome|  y|AgeRange|AgeGroup|
+---+-------------+-------+---------+-------+-------+-------+----+--------+----+-----+--------+--------+-----+--------+--------+---+--------+--------+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular|  19|  oct|      79|       1|   -1|       0| unknown| no|   31-35|      30|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular|  11|  may|     220|       1|  339|       4| failure| no|   31-35|      30|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular|  16|  apr|     185|       1|  330|       1| failure| no|   36-40|      35|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|   3|  jun|     199

In [64]:
# Agrupar por intervalo de edad y loan y contar. Loan No y AgeGroup 55 hay 10 clientes por ejemplo
age_class_counts = bank_df_with_age_group.groupBy("AgeGroup", "loan").count()
age_class_counts.show()

+--------+----+-----+
|AgeGroup|loan|count|
+--------+----+-----+
|      55| yes|   20|
|      20| yes|    4|
|      55|  no|   84|
|      15|  no|    1|
|      65|  no|    5|
|      45|  no|  100|
|      25|  no|   80|
|      40|  no|  125|
|      75|  no|    6|
|      25| yes|   15|
|      20|  no|    8|
|      30|  no|  180|
|      50| yes|   18|
|      70|  no|    1|
|      35| yes|   27|
|      60| yes|    3|
|      45| yes|   17|
|      35|  no|  149|
|      60|  no|   16|
|      30| yes|   30|
+--------+----+-----+
only showing top 20 rows



In [65]:
# Contar el número de clientes en cada grupo de edades. Agegroup 35 de todos los loan =20 clientes
total_in_age_group = bank_df_with_age_group.groupBy("AgeGroup").agg(count("*").alias("TotalCount"))
total_in_age_group.show()

+--------+----------+
|AgeGroup|TotalCount|
+--------+----------+
|      65|         5|
|      50|        95|
|      25|        95|
|      55|       104|
|      35|       176|
|      80|         3|
|      75|         6|
|      15|         1|
|      30|       210|
|      20|        12|
|      70|         1|
|      60|        19|
|      40|       155|
|      45|       117|
+--------+----------+



In [66]:
# Unir dataframes distribución edades e intervalos. Edad 15 con loan no hay 15 clientes y en total 20 clientes
age_class_distribution = age_class_counts.join(total_in_age_group, "AgeGroup")
age_class_distribution.show()

+--------+----+-----+----------+
|AgeGroup|loan|count|TotalCount|
+--------+----+-----+----------+
|      65|  no|    5|         5|
|      50|  no|   77|        95|
|      50| yes|   18|        95|
|      25| yes|   15|        95|
|      25|  no|   80|        95|
|      55|  no|   84|       104|
|      55| yes|   20|       104|
|      35|  no|  149|       176|
|      35| yes|   27|       176|
|      80|  no|    3|         3|
|      75|  no|    6|         6|
|      15|  no|    1|         1|
|      30| yes|   30|       210|
|      30|  no|  180|       210|
|      20|  no|    8|        12|
|      20| yes|    4|        12|
|      70|  no|    1|         1|
|      60|  no|   16|        19|
|      60| yes|    3|        19|
|      40| yes|   30|       155|
+--------+----+-----+----------+
only showing top 20 rows



In [67]:
# Calcular porcentaje clientes dentro de cada elemento. Edad 15 hay 20 clientes. 15 loan no y 5 loan si
age_class_distribution = age_class_distribution.withColumn(
    "Percentage",
    round((col("count") / col("TotalCount")) * 100, 2)
)
age_class_distribution.show()

+--------+----+-----+----------+----------+
|AgeGroup|loan|count|TotalCount|Percentage|
+--------+----+-----+----------+----------+
|      65|  no|    5|         5|     100.0|
|      50|  no|   77|        95|     81.05|
|      50| yes|   18|        95|     18.95|
|      25| yes|   15|        95|     15.79|
|      25|  no|   80|        95|     84.21|
|      55|  no|   84|       104|     80.77|
|      55| yes|   20|       104|     19.23|
|      35|  no|  149|       176|     84.66|
|      35| yes|   27|       176|     15.34|
|      80|  no|    3|         3|     100.0|
|      75|  no|    6|         6|     100.0|
|      15|  no|    1|         1|     100.0|
|      30| yes|   30|       210|     14.29|
|      30|  no|  180|       210|     85.71|
|      20|  no|    8|        12|     66.67|
|      20| yes|    4|        12|     33.33|
|      70|  no|    1|         1|     100.0|
|      60|  no|   16|        19|     84.21|
|      60| yes|    3|        19|     15.79|
|      40| yes|   30|       155|

In [68]:
# Ordenar
age_class_distribution = age_class_distribution.orderBy("AgeGroup", "loan")
# Resultado
print("Distribución de clientes según la edad y si tienen préstamo personal por intervalos:")
age_class_distribution.show()

Distribución de clientes según la edad y si tienen préstamo personal por intervalos:
+--------+----+-----+----------+----------+
|AgeGroup|loan|count|TotalCount|Percentage|
+--------+----+-----+----------+----------+
|      15|  no|    1|         1|     100.0|
|      20|  no|    8|        12|     66.67|
|      20| yes|    4|        12|     33.33|
|      25|  no|   80|        95|     84.21|
|      25| yes|   15|        95|     15.79|
|      30|  no|  180|       210|     85.71|
|      30| yes|   30|       210|     14.29|
|      35|  no|  149|       176|     84.66|
|      35| yes|   27|       176|     15.34|
|      40|  no|  125|       155|     80.65|
|      40| yes|   30|       155|     19.35|
|      45|  no|  100|       117|     85.47|
|      45| yes|   17|       117|     14.53|
|      50|  no|   77|        95|     81.05|
|      50| yes|   18|        95|     18.95|
|      55|  no|   84|       104|     80.77|
|      55| yes|   20|       104|     19.23|
|      60|  no|   16|        19|   

In [69]:
# Filtrar se suscribieron al depósito y default no
filtered_data = bank_df.filter((col("default") == "no") & (col("y") == "yes"))
filtered_data.show()

+---+-----------+--------+---------+-------+-------+-------+----+---------+----+-----+--------+--------+-----+--------+--------+---+--------+
|age|        job| marital|education|default|balance|housing|loan|  contact| day|month|duration|campaign|pdays|previous|poutcome|  y|AgeRange|
+---+-----------+--------+---------+-------+-------+-------+----+---------+----+-----+--------+--------+-----+--------+--------+---+--------+
| 20|    student|  single|secondary|     no|    502|     no|  no| cellular|  30|  apr|     261|       1|   -1|       0| unknown|yes|   21-25|
| 68|    retired|divorced|secondary|     no|   4189|     no|  no|telephone|  14|  jul|     897|       2|   -1|       0|    NULL|yes|   66-70|
| 32| management|  single| tertiary|     no|   2536|    yes|  no| cellular|  26|  aug|     958|       6|   -1|       0| unknown|yes|   31-35|
| 49| technician| married| tertiary|     no|   1235|     no|  no| cellular|  13|  aug|     354|       3|   -1|       0| unknown|yes|   46-50|
| 78| 

In [70]:
# Agrupar por contact y contar age <30
under30_contact = filtered_data.groupBy("contact").agg(count(when(col("age") <30, True)).alias("Under30"))
under30_contact.show()


+---------+-------+
|  contact|Under30|
+---------+-------+
|  unknown|      2|
| cellular|     18|
|telephone|      2|
+---------+-------+



In [71]:
# Método de contacto más utilizado para los clientes age<30 que se han suscrito al deposito y con default no
most_contact = under30_contact.orderBy(under30_contact["Under30"].desc()).first()


In [72]:
# Resultado
print("El contact más utilizado para los clientes con menos de 30 años con default no que se suscribieron al depósito:", most_contact["contact"])
print("Número de clientes con ese contact:", most_contact["Under30"])


El contact más utilizado para los clientes con menos de 30 años con default no que se suscribieron al depósito: cellular
Número de clientes con ese contact: 18


In [73]:
# Contar filas con nulos
num_rows_with_null = bank_df.na.drop().count()

# Resultado
print(f"El número de filas con al menos un valor nulo es: {bank_df.count() - num_rows_with_null}")


El número de filas con al menos un valor nulo es: 14


In [74]:
# Contar nulos por columnas
null_counts = [(column, bank_df.where(col(column).isNull()).count()) for column in bank_df.columns]

# Resultados
for column, count in null_counts:
    print(f"Columna '{column}' tiene {count} valor(es) nulo(s).")

Columna 'age' tiene 0 valor(es) nulo(s).
Columna 'job' tiene 0 valor(es) nulo(s).
Columna 'marital' tiene 0 valor(es) nulo(s).
Columna 'education' tiene 0 valor(es) nulo(s).
Columna 'default' tiene 0 valor(es) nulo(s).
Columna 'balance' tiene 0 valor(es) nulo(s).
Columna 'housing' tiene 0 valor(es) nulo(s).
Columna 'loan' tiene 0 valor(es) nulo(s).
Columna 'contact' tiene 0 valor(es) nulo(s).
Columna 'day' tiene 7 valor(es) nulo(s).
Columna 'month' tiene 0 valor(es) nulo(s).
Columna 'duration' tiene 0 valor(es) nulo(s).
Columna 'campaign' tiene 0 valor(es) nulo(s).
Columna 'pdays' tiene 0 valor(es) nulo(s).
Columna 'previous' tiene 0 valor(es) nulo(s).
Columna 'poutcome' tiene 7 valor(es) nulo(s).
Columna 'y' tiene 0 valor(es) nulo(s).
Columna 'AgeRange' tiene 0 valor(es) nulo(s).


In [75]:

# Completa campos vacíos con la media de la columna
imputer = Imputer(
    inputCols=['day'],
    outputCols=["{}_imputed".format(c) for c in ['day']]
    ).setStrategy("median")

# Aplicar transformación
bank_df_cleaned=imputer.fit(bank_df).transform(bank_df)

# Contar nulos por columnas
null_counts = [(column, bank_df_cleaned.where(col(column).isNull()).count()) for column in bank_df_cleaned.columns]

# Resultados
for column, count in null_counts:
    print(f"Columna '{column}' tiene {count} valor(es) nulo(s).")

Columna 'age' tiene 0 valor(es) nulo(s).
Columna 'job' tiene 0 valor(es) nulo(s).
Columna 'marital' tiene 0 valor(es) nulo(s).
Columna 'education' tiene 0 valor(es) nulo(s).
Columna 'default' tiene 0 valor(es) nulo(s).
Columna 'balance' tiene 0 valor(es) nulo(s).
Columna 'housing' tiene 0 valor(es) nulo(s).
Columna 'loan' tiene 0 valor(es) nulo(s).
Columna 'contact' tiene 0 valor(es) nulo(s).
Columna 'day' tiene 7 valor(es) nulo(s).
Columna 'month' tiene 0 valor(es) nulo(s).
Columna 'duration' tiene 0 valor(es) nulo(s).
Columna 'campaign' tiene 0 valor(es) nulo(s).
Columna 'pdays' tiene 0 valor(es) nulo(s).
Columna 'previous' tiene 0 valor(es) nulo(s).
Columna 'poutcome' tiene 7 valor(es) nulo(s).
Columna 'y' tiene 0 valor(es) nulo(s).
Columna 'AgeRange' tiene 0 valor(es) nulo(s).
Columna 'day_imputed' tiene 0 valor(es) nulo(s).


In [76]:
# Media de day
mean_age = bank_df.select(mean(bank_df['day'])).collect()[0][0]

# Nulos reemplazamos por la media
bank_df = bank_df.na.fill(mean_age, subset=['day'])

# Contar nulos por columnas
null_counts = [(column, bank_df.where(col(column).isNull()).count()) for column in bank_df.columns]

# Resultados
for column, count in null_counts:
    print(f"Columna '{column}' tiene {count} valor(es) nulo(s).")

Columna 'age' tiene 0 valor(es) nulo(s).
Columna 'job' tiene 0 valor(es) nulo(s).
Columna 'marital' tiene 0 valor(es) nulo(s).
Columna 'education' tiene 0 valor(es) nulo(s).
Columna 'default' tiene 0 valor(es) nulo(s).
Columna 'balance' tiene 0 valor(es) nulo(s).
Columna 'housing' tiene 0 valor(es) nulo(s).
Columna 'loan' tiene 0 valor(es) nulo(s).
Columna 'contact' tiene 0 valor(es) nulo(s).
Columna 'day' tiene 0 valor(es) nulo(s).
Columna 'month' tiene 0 valor(es) nulo(s).
Columna 'duration' tiene 0 valor(es) nulo(s).
Columna 'campaign' tiene 0 valor(es) nulo(s).
Columna 'pdays' tiene 0 valor(es) nulo(s).
Columna 'previous' tiene 0 valor(es) nulo(s).
Columna 'poutcome' tiene 7 valor(es) nulo(s).
Columna 'y' tiene 0 valor(es) nulo(s).
Columna 'AgeRange' tiene 0 valor(es) nulo(s).


In [77]:
# Elimina nulos y muestra
bank_df_cleaned=bank_df.na.drop()

# Elimina la fila si hay algun nulo
# All necesita todos nulos en la fila
bank_df_cleaned=bank_df.na.drop(how="any")

# Si al menos 3 campos completos, mantiene la fila
bank_df_cleaned=bank_df.na.drop(how="any",thresh=3)

# Solo elimina filas si tienen null en esa columna
bank_df_cleaned=bank_df.na.drop(how="any",subset=['poutcome'])

# Eliminar filas con poutcome null
bank_df_cleaned = bank_df.na.drop(subset=["month"])

# Completar campos vacíos de columnas con el valor que queramos
bank_df=bank_df.na.fill('unknown',['poutcome'])


In [78]:
# Reemplazar na con la media
bank_df.na.fill(bank_df.select(mean(bank_df['age'])).collect()[0][0],['age']).show()

+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+--------+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|AgeRange|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+--------+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|   31-35|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|   31-35|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|   36-40|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|   31-35|
| 59|  blue-c

In [79]:
# Contar nulos por columnas
null_counts = [(column, bank_df.where(col(column).isNull()).count()) for column in bank_df.columns]

# Resultados
for column, count in null_counts:
    print(f"Columna '{column}' tiene {count} valor(es) nulo(s).")

Columna 'age' tiene 0 valor(es) nulo(s).
Columna 'job' tiene 0 valor(es) nulo(s).
Columna 'marital' tiene 0 valor(es) nulo(s).
Columna 'education' tiene 0 valor(es) nulo(s).
Columna 'default' tiene 0 valor(es) nulo(s).
Columna 'balance' tiene 0 valor(es) nulo(s).
Columna 'housing' tiene 0 valor(es) nulo(s).
Columna 'loan' tiene 0 valor(es) nulo(s).
Columna 'contact' tiene 0 valor(es) nulo(s).
Columna 'day' tiene 0 valor(es) nulo(s).
Columna 'month' tiene 0 valor(es) nulo(s).
Columna 'duration' tiene 0 valor(es) nulo(s).
Columna 'campaign' tiene 0 valor(es) nulo(s).
Columna 'pdays' tiene 0 valor(es) nulo(s).
Columna 'previous' tiene 0 valor(es) nulo(s).
Columna 'poutcome' tiene 0 valor(es) nulo(s).
Columna 'y' tiene 0 valor(es) nulo(s).
Columna 'AgeRange' tiene 0 valor(es) nulo(s).


In [80]:
# Contar filas con nulos
num_rows_with_null = bank_df.na.drop().count()

# Resultado
print(f"El número de filas con al menos un valor nulo es: {bank_df.count() - num_rows_with_null}")


El número de filas con al menos un valor nulo es: 0


In [81]:
# Índices de salida
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "Index")
    for col in ["job", "marital", "education"]
]

In [82]:
# Pipeline
pipeline = Pipeline(stages=indexers)

In [83]:
# Aplicarla
bank_df_indexed = pipeline.fit(bank_df).transform(bank_df)

In [84]:
# Resultado
bank_df_indexed.select("job", "marital", "education","jobIndex", "maritalIndex", "educationIndex").show()


+-------------+-------+---------+--------+------------+--------------+
|          job|marital|education|jobIndex|maritalIndex|educationIndex|
+-------------+-------+---------+--------+------------+--------------+
|   unemployed|married|  primary|     8.0|         0.0|           2.0|
|     services|married|secondary|     4.0|         0.0|           0.0|
|   management| single| tertiary|     0.0|         1.0|           1.0|
|   management|married| tertiary|     0.0|         0.0|           1.0|
|  blue-collar|married|secondary|     1.0|         0.0|           0.0|
|   management| single| tertiary|     0.0|         1.0|           1.0|
|self-employed|married| tertiary|     5.0|         0.0|           1.0|
|   technician|married|secondary|     2.0|         0.0|           0.0|
| entrepreneur|married| tertiary|     7.0|         0.0|           1.0|
|     services|married|  primary|     4.0|         0.0|           2.0|
|     services|married|secondary|     4.0|         0.0|           0.0|
|     

In [85]:
# Índices
index_columns = ["jobIndex", "maritalIndex", "educationIndex"]

In [86]:
# Columnas a vector
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=index_columns, outputCol="features")

In [87]:
# Dataframe con columnas seleccionadas
df_assembled = assembler.transform(bank_df_indexed).select("features")

In [88]:
# Matriz de correlación
correlation_matrix = Correlation.corr(df_assembled, "features").collect()[0][0]

In [89]:

# Mostrar la matriz: 1 mucha correlacción, 0 poca
print("Matriz de correlación entre job, marital, education:")
print(correlation_matrix)


Matriz de correlación entre job, marital, education:
DenseMatrix([[1.        , 0.02219023, 0.01520319],
             [0.02219023, 1.        , 0.04566258],
             [0.01520319, 0.04566258, 1.        ]])


In [90]:
# Outliers edad
# Rango intercuartílico (IQR)
quantiles = bank_df.approxQuantile("age", [0.25, 0.75], 0.05)
Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1

# Límites
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Identificar outliers
outliers = bank_df.filter((col("age") < lower_bound) | (col("age") > upper_bound))

# Mostrarlos
outliers.show()

+---+----------+--------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+---+--------+
|age|       job| marital|education|default|balance|housing|loan|  contact|day|month|duration|campaign|pdays|previous|poutcome|  y|AgeRange|
+---+----------+--------+---------+-------+-------+-------+----+---------+---+-----+--------+--------+-----+--------+--------+---+--------+
| 78|   retired|divorced|  primary|     no|    229|     no|  no|telephone| 22|  oct|      97|       1|   -1|       0| unknown|yes|   76-80|
| 77|   retired|divorced| tertiary|     no|   4659|     no|  no| cellular| 14|  apr|     161|       1|   -1|       0| unknown|yes|   76-80|
| 78| housemaid| married|secondary|     no|    499|     no|  no|telephone| 16|  mar|      80|       4|   -1|       0| unknown| no|   76-80|
| 75|   retired| married|secondary|     no|   3771|     no|  no|telephone| 15|  apr|     185|       1|  181|       2| success|yes|   76-80|
| 70|   retired|divo

In [91]:
# Reemplazar los outliers en la columna 'age'
bank_df = bank_df.withColumn(
    "age",
    when(col("age") < lower_bound, lower_bound)
    .when(col("age") > upper_bound, upper_bound)
    .otherwise(col("age"))
)

In [92]:
# Volver a buscar outliers
outliers = bank_df.filter((col("age") < lower_bound) | (col("age") > upper_bound))

# Mostrarlos
outliers.show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+--------+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|AgeRange|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+--------+
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+--------+



In [93]:
# Normalizar
from pyspark.ml.feature import MinMaxScaler
# Combinar las columnas en un vector
assembler = VectorAssembler(inputCols=["age", "balance"], outputCol="features")
df_assembled = assembler.transform(bank_df)

# MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")

# Ajustar y normalizar
scaler_model = scaler.fit(df_assembled)
df_normalizado = scaler_model.transform(df_assembled)

# Mostrar
df_normalizado.show()

+----+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+--------+-------------+--------------------+
| age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|AgeRange|     features|     features_scaled|
+----+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+--------+-------------+--------------------+
|30.0|   unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|   31-35|[30.0,1787.0]|[0.22448979591836...|
|33.0|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|   31-35|[33.0,4789.0]|[0.28571428571428...|
|35.0|   management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|     

In [94]:
# Estandarizar
from pyspark.ml.feature import StandardScaler
# Combinar columnas
assembler = VectorAssembler(inputCols=["age", "balance"], outputCol="features")
df_assembled = assembler.transform(bank_df_indexed)

# StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Ajustar
scaler_model = scaler.fit(df_assembled)
df_estandarizado = scaler_model.transform(df_assembled)

# Combinar categóricas convertidas y numéricas en un vector
assembler_total = VectorAssembler(inputCols=["features_scaled", "jobIndex","maritalIndex","educationIndex"], outputCol="features_total")
df_final = assembler_total.transform(df_estandarizado)

df_final.show(truncate=False)

+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+--------+--------+------------+--------------+-------------+----------------------------------------+-----------------------------------------------------+
|age|job          |marital|education|default|balance|housing|loan|contact |day|month|duration|campaign|pdays|previous|poutcome|y  |AgeRange|jobIndex|maritalIndex|educationIndex|features     |features_scaled                         |features_total                                       |
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+--------+--------+------------+--------------+-------------+----------------------------------------+-----------------------------------------------------+
|30 |unemployed   |married|primary  |no     |1787   |no     |no  |cellular|19 |oct  |79      |1       |-1   |0       |unknown |no |31-35   

In [95]:
# Crear rdd de tuplas
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

# Ver sus datos
print("Todos los elementos del rdd: ", rdd.collect())


Todos los elementos del rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


In [96]:
# Contar elementos
count = rdd.count()
print("Total de elementos en el rdd: ", count)


Total de elementos en el rdd:  4


In [97]:

# Obtener primer elemento
first_element = rdd.first()
print("Primer elemento del rdd: ", first_element)

# Coger 2 primeros
taken_elements = rdd.take(2)
print("Primeros dos elementos del rdd: ", taken_elements)


Primer elemento del rdd:  ('Alice', 25)
Primeros dos elementos del rdd:  [('Alice', 25), ('Bob', 30)]


In [98]:
# Definir la función para escribir en un archivo
def write_to_file(item):
    with open("/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/output.txt", "a") as f:
        f.write(item + "\n")

# Aplicar la función write_to_file a cada elemento del RDD usando foreach
rdd.foreach(lambda item: write_to_file(item[0]))

In [99]:
# Convierte nombre a mayúsculas
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))

# Imprimir
result = mapped_rdd.collect()
print("rdd con mayúsculas: ", result)


rdd con mayúsculas:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


In [100]:
# Filtra los de edades mayores a 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
print(filtered_rdd.collect())

[('Charlie', 35), ('Alice', 40)]


In [101]:
# Calcula suma de edades por nombre
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
print(reduced_rdd.collect())


[('Alice', 65), ('Bob', 30), ('Charlie', 35)]


In [102]:
# Ordena por edad descendente
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
print(sorted_rdd.collect())


[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]


In [103]:
# Guardar en txt
rdd.saveAsTextFile("/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/output1.txt")

# Leer de text
rdd_text = spark.sparkContext.textFile("/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/output1.txt")
rdd_text.collect()


["('Charlie', 35)", "('Alice', 40)", "('Alice', 25)", "('Bob', 30)"]

In [104]:
# Convertir el RDD a DataFrame permitiendo que PySpark infiera el esquema
df = rdd.toDF(["name", "age"])
df.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  Alice| 40|
+-------+---+



In [105]:
# Convertir el DataFrame a RDD
rdd_from_df = df.rdd

# Mostrar los elementos del RDD
for row in rdd_from_df.collect():
    print(row)

Row(name='Alice', age=25)
Row(name='Bob', age=30)
Row(name='Charlie', age=35)
Row(name='Alice', age=40)


In [106]:
# Crear tabla temporal
bank_df.createOrReplaceTempView("bank")


In [107]:
# Seleccionar filas con edad superior a 25
result = spark.sql("SELECT * FROM bank WHERE age > 25")

result.show()


+----+-------------+--------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+--------+
| age|          job| marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|AgeRange|
+----+-------------+--------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+--------+
|30.0|   unemployed| married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|   31-35|
|33.0|     services| married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|   31-35|
|35.0|   management|  single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|   36-40|
|30.0|   management| married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|   31-35|

In [108]:
# Calcular balance medio por clase
avg_balance_by_y = spark.sql("SELECT y, AVG(balance) as avg_balance FROM bank GROUP BY y")
avg_balance_by_y.show()


+---+------------------+
|  y|       avg_balance|
+---+------------------+
| no|1452.7457627118645|
|yes|1865.0964912280701|
+---+------------------+



In [109]:
# Crear dataframes
employee_data = [
    (1, "John"), (2, "Alice"), (3, "Bob"), (4, "Emily"),
    (5, "David"), (6, "Sarah"), (7, "Michael"), (8, "Lisa"),
    (9, "William")
]
employees = spark.createDataFrame(employee_data, ["id", "name"])

salary_data = [
    ("HR", 1, 60000), ("HR", 2, 55000), ("HR", 3, 58000),
    ("IT", 4, 70000), ("IT", 5, 72000), ("IT", 6, 68000),
    ("Sales", 7, 75000), ("Sales", 8, 78000), ("Sales", 9, 77000)
]
salaries = spark.createDataFrame(salary_data, ["department", "id", "salary"])

employees.show()

salaries.show()

# Crear vistas temporales
employees.createOrReplaceTempView("employees")
salaries.createOrReplaceTempView("salaries")



+---+-------+
| id|   name|
+---+-------+
|  1|   John|
|  2|  Alice|
|  3|    Bob|
|  4|  Emily|
|  5|  David|
|  6|  Sarah|
|  7|Michael|
|  8|   Lisa|
|  9|William|
+---+-------+

+----------+---+------+
|department| id|salary|
+----------+---+------+
|        HR|  1| 60000|
|        HR|  2| 55000|
|        HR|  3| 58000|
|        IT|  4| 70000|
|        IT|  5| 72000|
|        IT|  6| 68000|
|     Sales|  7| 75000|
|     Sales|  8| 78000|
|     Sales|  9| 77000|
+----------+---+------+



In [110]:
# Buscar empleados cuyo sueldo esta debajo de la media
result = spark.sql("""
    SELECT name
    FROM employees
    WHERE id IN (
        SELECT id
        FROM salaries
        WHERE salary > (SELECT AVG(salary) FROM salaries)
    )
""")

result.show()


+-------+
|   name|
+-------+
|  Emily|
|  David|
|Michael|
|   Lisa|
|William|
+-------+



In [111]:

# Hacer join
employee_salary = spark.sql("""
    select  salaries.*, employees.name
    from salaries
    left join employees on salaries.id = employees.id
""")

employee_salary.show()


+----------+---+------+-------+
|department| id|salary|   name|
+----------+---+------+-------+
|        HR|  1| 60000|   John|
|        HR|  3| 58000|    Bob|
|        HR|  2| 55000|  Alice|
|        IT|  4| 70000|  Emily|
|     Sales|  7| 75000|Michael|
|        IT|  6| 68000|  Sarah|
|     Sales|  9| 77000|William|
|        IT|  5| 72000|  David|
|     Sales|  8| 78000|   Lisa|
+----------+---+------+-------+



In [112]:
# Crear ventana por departamentos
window_spec = Window.partitionBy("department").orderBy(F.desc("salary"))


# Ranking por departamento basandose en el salario
employee_salary.withColumn("rank", F.rank().over(window_spec)).show()


+----------+---+------+-------+----+
|department| id|salary|   name|rank|
+----------+---+------+-------+----+
|        HR|  1| 60000|   John|   1|
|        HR|  3| 58000|    Bob|   2|
|        HR|  2| 55000|  Alice|   3|
|        IT|  5| 72000|  David|   1|
|        IT|  4| 70000|  Emily|   2|
|        IT|  6| 68000|  Sarah|   3|
|     Sales|  8| 78000|   Lisa|   1|
|     Sales|  9| 77000|William|   2|
|     Sales|  7| 75000|Michael|   3|
+----------+---+------+-------+----+



In [113]:
# Comprobar si existe
view_exists = spark.catalog.tableExists("employees")
view_exists

# Eliminarla
spark.catalog.dropTempView("employees")


# Comprobar si existe
view_exists = spark.catalog.tableExists("employees")
view_exists


False

## Modelos machine learning

In [114]:
# Cargar datos
bank_df = spark.read.csv(path, header=True, inferSchema=True,sep=";")
bank_df = bank_df.dropna()

In [115]:
bank_df.show()

+---+------------+--------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30|  unemployed| married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|    services| married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35|  management|  single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30|  management| married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59| blue-collar| married|secondary|     no|      0|    yes|  no| unknown| 

In [116]:
# Convertir variables categóricas a numéricas
indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index").fit(bank_df)
    for column in ["job","marital","education","default","housing","loan","contact","month","poutcome","y"]
]
pipeline = Pipeline(stages=indexers)
bank_df = pipeline.fit(bank_df).transform(bank_df)

In [117]:
# Seleccionar las características para el modelo
feature_columns = ["job_index","marital_index","education_index","default_index","housing_index","loan_index","contact_index","month_index","poutcome_index","age","balance","day","duration","campaign","pdays","previous"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(bank_df).select("features", "y_index")

In [118]:
# Dividir los datos en 3
train_data, test_data, val_data = data.randomSplit([0.6, 0.2, 0.2], seed=42)

In [119]:
# Modelo
dt = DecisionTreeClassifier(featuresCol='features', labelCol='y_index')

In [120]:
# Cuadrícula
param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 7]) \
    .addGrid(dt.minInfoGain, [0.0, 0.1, 0.2]) \
    .build()

In [121]:
# Evaluador
evaluator = BinaryClassificationEvaluator(labelCol='y_index')

In [122]:
# Validación cruzada
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)

In [123]:
# Entrenar el modelo
cv_model = crossval.fit(train_data)

In [124]:
# Hacer predicciones
predictions = cv_model.transform(val_data)

# Evaluar el rendimiento
accuracy = evaluator.evaluate(predictions)

print("Accuracy:", accuracy)

Accuracy: 0.5


In [125]:
# Crear modelo
lr = LogisticRegression(featuresCol='features', labelCol='y_index')

In [126]:

# Definir cuadrícula
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [127]:
# Evaluador
evaluator = BinaryClassificationEvaluator(labelCol='y_index')

In [128]:
# Validación cruzada
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)

In [129]:
# Entrenar el modelo
cv_model = crossval.fit(train_data)

In [130]:
# Predicciones
predictions = cv_model.transform(test_data)

# Evaluar el rendimiento
accuracy = evaluator.evaluate(predictions)

print("Accuracy:", accuracy)

Accuracy: 0.8445065176908745


In [131]:
# Crear un modelo de SVM
svm = LinearSVC(featuresCol="features", labelCol="y_index")

In [132]:
# Definir los parámetros para la validación cruzada
paramGrid = ParamGridBuilder() \
    .addGrid(svm.maxIter, [10, 20]) \
    .addGrid(svm.regParam, [0.01, 0.1, 1.0]) \
    .build()

In [133]:
# Crear un evaluador para la métrica de precisión
evaluator = BinaryClassificationEvaluator(labelCol="y_index")

In [134]:
# Configurar la validación cruzada
crossval = CrossValidator(estimator=svm,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [135]:
# Entrenar el modelo
cv_model = crossval.fit(train_data)

In [136]:
# Hacer predicciones en el conjunto de prueba
predictions = cv_model.transform(test_data)

In [137]:

# Evaluar el modelo
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Mostrar las predicciones
predictions.select("features", "y_index", "prediction").show()

Accuracy: 0.8289106145251404
+--------------------+-------+----------+
|            features|y_index|prediction|
+--------------------+-------+----------+
|(16,[0,1,2,7,9,11...|    0.0|       0.0|
|(16,[0,1,2,9,10,1...|    0.0|       0.0|
|(16,[0,1,4,9,11,1...|    0.0|       0.0|
|(16,[0,1,5,9,10,1...|    1.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,7,9,10,1...|    0.0|       0.0|
|(16,[0,1,7,9,10,1...|    1.0|       1.0|
|(16,[0,1,7,9,10,1...|    0.0|       0.0|
|(16,[0,1,9,10,11,...|    0.0|       0.0|
|(16,[0,1,9,10,11,...|    1.0|       0.0|
|(16,[0,1,9,10,11,...|    0.0|       0.0|
|(16,[0,2,4,7,9,11...|    0.0|       1.0|
+----

In [138]:
# Crear un modelo de GBT
gbt = GBTClassifier(featuresCol="features", labelCol="y_index")

In [139]:
# Definir los parámetros para la validación cruzada
'''paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10, 20]) \
    .addGrid(gbt.maxDepth, [5, 10]) \
    .build()'''
paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10]) \
    .build()

In [140]:
# Crear un evaluador para la métrica de precisión multiclase
evaluator = MulticlassClassificationEvaluator(labelCol="y_index", metricName="accuracy")

In [141]:
# Configurar la validación cruzada para GBT
crossval_gbt = CrossValidator(estimator=gbt,
                              estimatorParamMaps=paramGrid_gbt,
                              evaluator=evaluator,
                              numFolds=3)

In [None]:
# Entrenar el modelo GBT
cv_model_gbt = crossval_gbt.fit(train_data)

In [None]:
# Hacer predicciones en el conjunto de prueba
predictions_gbt = cv_model_gbt.transform(test_data)

In [None]:
# Evaluar el modelo GBT
accuracy_gbt = evaluator.evaluate(predictions_gbt)
print(f"GBT Accuracy: {accuracy_gbt}")

GBT Accuracy: 0.8916256157635468


In [None]:
# Mostrar las predicciones
predictions_gbt.select("features", "y_index", "prediction").show()

+--------------------+-------+----------+
|            features|y_index|prediction|
+--------------------+-------+----------+
|(16,[0,1,2,7,9,11...|    0.0|       0.0|
|(16,[0,1,2,9,10,1...|    0.0|       0.0|
|(16,[0,1,4,9,11,1...|    0.0|       0.0|
|(16,[0,1,5,9,10,1...|    1.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,7,9,10,1...|    0.0|       0.0|
|(16,[0,1,7,9,10,1...|    1.0|       1.0|
|(16,[0,1,7,9,10,1...|    0.0|       0.0|
|(16,[0,1,9,10,11,...|    0.0|       0.0|
|(16,[0,1,9,10,11,...|    1.0|       0.0|
|(16,[0,1,9,10,11,...|    0.0|       0.0|
|(16,[0,2,4,7,9,11...|    0.0|       1.0|
+--------------------+-------+----

In [None]:
# Crear un modelo de Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="y_index")

In [None]:
# Definir los parámetros para la validación cruzada
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

In [None]:
# Configurar la validación cruzada para Random Forest
crossval_rf = CrossValidator(estimator=rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=evaluator,
                             numFolds=3)

In [None]:
# Entrenar el modelo Random Forest
cv_model_rf = crossval_rf.fit(train_data)

In [None]:
# Hacer predicciones en el conjunto de prueba
predictions_rf = cv_model_rf.transform(test_data)

In [None]:

# Evaluar el modelo Random Forest
accuracy_rf = evaluator.evaluate(predictions_rf)
print(f"Random Forest Accuracy: {accuracy_rf}")

# Mostrar las predicciones
predictions_rf.select("features", "y_index", "prediction").show()


Random Forest Accuracy: 0.896551724137931
+--------------------+-------+----------+
|            features|y_index|prediction|
+--------------------+-------+----------+
|(16,[0,1,2,7,9,11...|    0.0|       0.0|
|(16,[0,1,2,9,10,1...|    0.0|       0.0|
|(16,[0,1,4,9,11,1...|    0.0|       0.0|
|(16,[0,1,5,9,10,1...|    1.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,6,9,10,1...|    0.0|       0.0|
|(16,[0,1,7,9,10,1...|    0.0|       0.0|
|(16,[0,1,7,9,10,1...|    1.0|       0.0|
|(16,[0,1,7,9,10,1...|    0.0|       0.0|
|(16,[0,1,9,10,11,...|    0.0|       0.0|
|(16,[0,1,9,10,11,...|    1.0|       0.0|
|(16,[0,1,9,10,11,...|    0.0|       0.0|
|(16,[0,2,4,7,9,11...|    0.0|    

In [None]:
# Separar, inicializar, agrupar y ordenar
rdd = spark.sparkContext.textFile("/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/data.txt")
result_rdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False)


Py4JJavaError: An error occurred while calling o32343.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Input path does not exist: file:/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more


In [None]:
# Mostrar 10 mayores
print(result_rdd.take(10))

In [None]:
# Otra forma: cargar texto
df = spark.read.text("/content/drive/MyDrive/Colab Notebooks/proyecto-pyspark/data.txt")

In [None]:

# Lo mismo, separar palabras, agrupar contando y ordenar
result_df = df.selectExpr("explode(split(value, ' ')) as word") \
    .groupBy("word").count().orderBy(desc("count"))

# Mostrar 10 mayores
print(result_df.take(10))


In [None]:
# Parar sesión
spark.stop()
