# PySpark main functions

## Descargar dataset Titanic

In [None]:
import requests

url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
response = requests.get(url)

with open("data/titanic.csv", "wb") as file:
    file.write(response.content)

print("Archivo descargado correctamente")

# Crear sesión PySpark

In [1]:
import findspark
findspark.init()  # Ayuda a encontrar Spark si no está en el PATH
from pyspark.sql import SparkSession
# Crear una sesión de Spark
spark = SparkSession.builder.appName("TitanicPySpark").getOrCreate()

25/03/24 12:50:26 WARN Utils: Your hostname, MacBook-Pro-de-Alain.local resolves to a loopback address: 127.0.0.1; using 192.168.1.143 instead (on interface en0)
25/03/24 12:50:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/24 12:50:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Cargar dataset

In [2]:
df = spark.read.csv("data/titanic.csv", header=True, inferSchema=True)
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

# Mostrar esquema dataset

In [4]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



# Seleccionar columnas

In [5]:
df.select("Name", "Age", "Sex", "Survived").show(5)


+--------------------+----+------+--------+
|                Name| Age|   Sex|Survived|
+--------------------+----+------+--------+
|Braund, Mr. Owen ...|22.0|  male|       0|
|Cumings, Mrs. Joh...|38.0|female|       1|
|Heikkinen, Miss. ...|26.0|female|       1|
|Futrelle, Mrs. Ja...|35.0|female|       1|
|Allen, Mr. Willia...|35.0|  male|       0|
+--------------------+----+------+--------+
only showing top 5 rows



# Filtrar datos

### Pasajeros mayores de 30 años

In [6]:
df.filter(df["Age"] > 30).show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|  C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|  373450|   8.05| NULL|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|  E46|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  113783|  26.55| C103|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



### Mujeres que sobrevivierosn

In [8]:
df.filter((df['Sex']=='female') & (df['Survived']==1)).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599| 71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|    53.1| C123|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742| 11.1333| NULL|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736| 30.0708| NULL|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    

# Ordenar datos

### Ordenar por edad

In [12]:
df.orderBy('Age').show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
|         29|       1|     3|"O'Dwyer, Miss. E...|female|NULL|    0|    0|  330959|  7.8792| NULL|       Q|
|         43|       0|     3| Kraeff, Mr. Theodor|  male|NULL|    0|    0|  349253|  7.8958| NULL|       C|
|         30|       0|     3| Todoroff, Mr. Lalio|  male|NULL|    0|    0|  349216|  7.8958| NULL|       S|
|         27|       0|     3|Emir, Mr. Farred ...|  male|NULL|    0|    0|    2631|   7.225| NULL|       C|
|         32|       1|     1|Spencer, Mrs. Wil...|female|NULL|    1|    0|PC 17569|146.5208|  B78|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----+--------+
only showing top 5 rows



### Ordenar por tarifa descendente

In [14]:
df.orderBy(df['Fare'].desc()).show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|    Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----------+--------+
|        259|       1|     1|    Ward, Miss. Anna|female|35.0|    0|    0|PC 17755|512.3292|       NULL|       C|
|        680|       1|     1|Cardeza, Mr. Thom...|  male|36.0|    0|    1|PC 17755|512.3292|B51 B53 B55|       C|
|        738|       1|     1|Lesurer, Mr. Gust...|  male|35.0|    0|    0|PC 17755|512.3292|       B101|       C|
|         89|       1|     1|Fortune, Miss. Ma...|female|23.0|    3|    2|   19950|   263.0|C23 C25 C27|       S|
|         28|       0|     1|Fortune, Mr. Char...|  male|19.0|    3|    2|   19950|   263.0|C23 C25 C27|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-----

# Agrupar y contar datos

In [15]:
df.groupBy('Sex').sum('Survived').show()

+------+-------------+
|   Sex|sum(Survived)|
+------+-------------+
|female|          233|
|  male|          109|
+------+-------------+



# Estadísticas

In [17]:
df.describe().show()

25/03/24 12:58:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                NULL|  NULL| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

# Agregar y modificar columnas

In [19]:
from pyspark.sql.functions import when

df = df.withColumn('Menor_Edad', when(df['Age'] < 18, 'Si').otherwise('No'))

df.select('Name', 'Age', 'Menor_Edad').show(5)

+--------------------+----+----------+
|                Name| Age|Menor_Edad|
+--------------------+----+----------+
|Braund, Mr. Owen ...|22.0|        No|
|Cumings, Mrs. Joh...|38.0|        No|
|Heikkinen, Miss. ...|26.0|        No|
|Futrelle, Mrs. Ja...|35.0|        No|
|Allen, Mr. Willia...|35.0|        No|
+--------------------+----+----------+
only showing top 5 rows



# Manejo de valores nulos

### Ver cuántos valores nulos hay por columna

In [None]:
from pyspark.sql.functions import col, isnan, count

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


### Llenar valores nulos en la columna "Age" con la media de la edad

In [20]:
from pyspark.sql.functions import avg

mean_age = df.select(avg("Age")).collect()[0][0]
df = df.fillna({"Age": mean_age})


# Consultas SQL

In [None]:
df.createOrReplaceTempView("titanic")

# 📌 ¿Cuál es la edad promedio por clase?
spark.sql("SELECT Pclass, AVG(Age) AS PromedioEdad FROM titanic GROUP BY Pclass").show()

+------+------------------+
|Pclass|      PromedioEdad|
+------+------------------+
|     1| 37.04811819172115|
|     3|26.403258655804464|
|     2|  29.8669581202046|
+------+------------------+



25/03/24 18:44:31 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 239919 ms exceeds timeout 120000 ms
25/03/24 18:44:31 WARN SparkContext: Killing executors is not supported by current scheduler.
25/03/24 18:44:35 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$