# Introducción

## Encontrando la instalación de PySpark e instalando las liberías necesarias

In [1]:
!pip install findspark
!pip install pyspark
import findspark
findspark.init()



## Cómo crear una sesión en Spark

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("PySpark").config('spark.ui.port', '4050').getOrCreate()
spark

# DataFrame

## Leer un csv 

In [3]:
df_titanic = spark.read.csv('titanic.csv',header=True)

df_titanic.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



## Eliminando las columnas no necesarias

In [4]:
NUMERIC_COLUMNS = ["Age","SibSp","Parch","Fare"]
ENUM_COLUMNS = ["Embarked","Pclass","Sex"]
LABEL_COLUMN = ["Survived"]

df_titanic = df_titanic.select(NUMERIC_COLUMNS + ENUM_COLUMNS + LABEL_COLUMN)
df_titanic.printSchema()

root
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: string (nullable = true)



## Transformando el DataFrame

In [5]:
from pyspark.sql.functions import col
def cast_to_float(input_df):
    return input_df.select([col(col_name).cast("float") if col_name in NUMERIC_COLUMNS 
                            else col(col_name)
                            for col_name in input_df.columns])

df_titanic = df_titanic.transform(cast_to_float)
df_titanic.printSchema()

root
 |-- Age: float (nullable = true)
 |-- SibSp: float (nullable = true)
 |-- Parch: float (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: string (nullable = true)



## Filtrando por las columnas

### Usando where

In [6]:
df_survived = df_titanic.where(col("Survived") == "1")
df_survived.show() 

+----+-----+-----+--------+--------+------+------+--------+
| Age|SibSp|Parch|    Fare|Embarked|Pclass|   Sex|Survived|
+----+-----+-----+--------+--------+------+------+--------+
|38.0|  1.0|  0.0| 71.2833|       C|     1|female|       1|
|26.0|  0.0|  0.0|   7.925|       S|     3|female|       1|
|35.0|  1.0|  0.0|    53.1|       S|     1|female|       1|
|27.0|  0.0|  2.0| 11.1333|       S|     3|female|       1|
|14.0|  1.0|  0.0| 30.0708|       C|     2|female|       1|
| 4.0|  1.0|  1.0|    16.7|       S|     3|female|       1|
|58.0|  0.0|  0.0|   26.55|       S|     1|female|       1|
|55.0|  0.0|  0.0|    16.0|       S|     2|female|       1|
|null|  0.0|  0.0|    13.0|       S|     2|  male|       1|
|null|  0.0|  0.0|   7.225|       C|     3|female|       1|
|34.0|  0.0|  0.0|    13.0|       S|     2|  male|       1|
|15.0|  0.0|  0.0|  8.0292|       Q|     3|female|       1|
|28.0|  0.0|  0.0|    35.5|       S|     1|  male|       1|
|38.0|  1.0|  5.0| 31.3875|       S|    

### Usando SQL

In [7]:
df_titanic.createOrReplaceTempView("df_titanic")
df_sql_survived = spark.sql("SELECT * FROM df_titanic WHERE survived='1'")
df_sql_survived.show()

+----+-----+-----+--------+--------+------+------+--------+
| Age|SibSp|Parch|    Fare|Embarked|Pclass|   Sex|Survived|
+----+-----+-----+--------+--------+------+------+--------+
|38.0|  1.0|  0.0| 71.2833|       C|     1|female|       1|
|26.0|  0.0|  0.0|   7.925|       S|     3|female|       1|
|35.0|  1.0|  0.0|    53.1|       S|     1|female|       1|
|27.0|  0.0|  2.0| 11.1333|       S|     3|female|       1|
|14.0|  1.0|  0.0| 30.0708|       C|     2|female|       1|
| 4.0|  1.0|  1.0|    16.7|       S|     3|female|       1|
|58.0|  0.0|  0.0|   26.55|       S|     1|female|       1|
|55.0|  0.0|  0.0|    16.0|       S|     2|female|       1|
|null|  0.0|  0.0|    13.0|       S|     2|  male|       1|
|null|  0.0|  0.0|   7.225|       C|     3|female|       1|
|34.0|  0.0|  0.0|    13.0|       S|     2|  male|       1|
|15.0|  0.0|  0.0|  8.0292|       Q|     3|female|       1|
|28.0|  0.0|  0.0|    35.5|       S|     1|  male|       1|
|38.0|  1.0|  5.0| 31.3875|       S|    

In [8]:
"""
Ejercicio
Obten todas las tuplas que han pagado menos de 10$ por embarcar en el barco. La columna en cuestión es 'Fare'
Puedes user where, una consulta SQL o ambas
"""





"\nEjercicio\nObten todas las tuplas que han pagado menos de 10$ por embarcar en el barco. La columna en cuestión es 'Fare'\nPuedes user where, una consulta SQL o ambas\n"

## Resumen estadístico del DataFrame

In [9]:
df_titanic.select(["Age","SibSp","Parch","Pclass"]).summary().show()

+-------+------------------+------------------+-------------------+------------------+
|summary|               Age|             SibSp|              Parch|            Pclass|
+-------+------------------+------------------+-------------------+------------------+
|  count|               714|               891|                891|               891|
|   mean| 29.69911764704046|0.5230078563411896|0.38159371492704824| 2.308641975308642|
| stddev|14.526497332370992|1.1027434322934315| 0.8060572211299488|0.8360712409770491|
|    min|              0.42|               0.0|                0.0|                 1|
|    25%|              20.0|               0.0|                0.0|               2.0|
|    50%|              28.0|               0.0|                0.0|               3.0|
|    75%|              38.0|               1.0|                0.0|               3.0|
|    max|              80.0|               8.0|                6.0|                 3|
+-------+------------------+---------------

In [10]:
"""
Ejercicio
Haz un resumen de todas las columnas no numéricas, ¿qué sucede?
"""

df_titanic.select(["Embarked","Pclass","Sex"]).summary().show()
df_titanic.printSchema()

+-------+--------+------------------+------+
|summary|Embarked|            Pclass|   Sex|
+-------+--------+------------------+------+
|  count|     889|               891|   891|
|   mean|    null| 2.308641975308642|  null|
| stddev|    null|0.8360712409770491|  null|
|    min|       C|                 1|female|
|    25%|    null|               2.0|  null|
|    50%|    null|               3.0|  null|
|    75%|    null|               3.0|  null|
|    max|       S|                 3|  male|
+-------+--------+------------------+------+

root
 |-- Age: float (nullable = true)
 |-- SibSp: float (nullable = true)
 |-- Parch: float (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: string (nullable = true)



## Agrupando columnas

In [11]:
import pyspark.sql.functions as F
df_titanic_groupby_class = df_titanic.groupby("Pclass").agg(F.count("Pclass").alias("Number of ocurrences"),F.mean("Age").alias("Average age")).orderBy("Pclass",ascending=True)
df_titanic_groupby_class.show()

+------+--------------------+------------------+
|Pclass|Number of ocurrences|       Average age|
+------+--------------------+------------------+
|     1|                 216| 38.23344086030478|
|     2|                 184|29.877630057706998|
|     3|                 491| 25.14061971827292|
+------+--------------------+------------------+



In [12]:
"""
Ejercicio
Haz lo mismo que en el ejemplo anterior, pero agrupando por el puerto donde tomó el barco
La columna que indica este dato es 'Embarked'
"""

df_titanic_groupby_embarked = df_titanic.groupby("Embarked").agg(F.count("Embarked").alias("Number of ocurrences"),F.mean("Age").alias("Average age")).orderBy("Embarked",ascending=True)
df_titanic_groupby_embarked.show()



+--------+--------------------+------------------+
|Embarked|Number of ocurrences|       Average age|
+--------+--------------------+------------------+
|    null|                   0|              50.0|
|       C|                 168| 30.81476923066836|
|       Q|                  77|28.089285714285715|
|       S|                 644| 29.44539711191336|
+--------+--------------------+------------------+



## Definiendo funciones UDF (User Defined Functions) y aplicándolas sobre una columna concreta

In [13]:
@F.udf
def substract_one(x):
    return str(int(x)-1)

df_titanic_substracted = df_titanic.withColumn("Pclass",substract_one(col("Pclass")))
df_titanic_substracted.show(10)

+----+-----+-----+-------+--------+------+------+--------+
| Age|SibSp|Parch|   Fare|Embarked|Pclass|   Sex|Survived|
+----+-----+-----+-------+--------+------+------+--------+
|22.0|  1.0|  0.0|   7.25|       S|     2|  male|       0|
|38.0|  1.0|  0.0|71.2833|       C|     0|female|       1|
|26.0|  0.0|  0.0|  7.925|       S|     2|female|       1|
|35.0|  1.0|  0.0|   53.1|       S|     0|female|       1|
|35.0|  0.0|  0.0|   8.05|       S|     2|  male|       0|
|null|  0.0|  0.0| 8.4583|       Q|     2|  male|       0|
|54.0|  0.0|  0.0|51.8625|       S|     0|  male|       0|
| 2.0|  3.0|  1.0| 21.075|       S|     2|  male|       0|
|27.0|  0.0|  2.0|11.1333|       S|     2|female|       1|
|14.0|  1.0|  0.0|30.0708|       C|     1|female|       1|
+----+-----+-----+-------+--------+------+------+--------+
only showing top 10 rows



In [14]:
"""
Ejercicio
Define una función que pase mayúsculas a minúsculas y aplicala a la columna 'Embarked'
Pista: Ten cuidado, puede haber valores nulos
"""





"\nEjercicio\nDefine una función que pase mayúsculas a minúsculas y aplicala a la columna 'Embarked'\nPista: Ten cuidado, puede haber valores nulos\n"

## Ejercicio final

In [15]:
"""
Ejercicio
Usando el Dataframe df_titanic separa en dos tablas diferentes a los hombres y mujeres (columna 'Sex').
Después obten en dos nuevas tablas el porcentaje de supervivientes por clase (columna 'Pclass') 
"""


"\nEjercicio\nUsando el Dataframe df_titanic separa en dos tablas diferentes a los hombres y mujeres (columna 'Sex').\nDespués obten en dos nuevas tablas el porcentaje de supervivientes por clase (columna 'Pclass') \n"

# RDD (Resilient Distributed Datasets)

In [16]:
sc = spark.sparkContext

In [17]:
rdd = sc.parallelize(range(1,101))
rdd

PythonRDD[56] at RDD at PythonRDD.scala:53

In [18]:
rdd.take(15)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

## Filtrando

In [19]:
rdd_filtered = rdd.filter(lambda x: x > 50)
rdd_filtered.take(10)

[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]

In [20]:
"""
Ejercicio
Filtra el RDD original para solo quedarte con los números pares y muestra las 15 primeras ocurrencias
"""


'\nEjercicio\nFiltra el RDD original para solo quedarte con los números pares y muestra las 15 primeras ocurrencias\n'

## Aplicando funciones a todos los elementos

In [21]:
rdd_mapped = rdd.map(lambda x: x*100)
rdd_mapped.take(5)

[100, 200, 300, 400, 500]

In [22]:
"""
Ejercicio
Calcula los cuadrados de todos los números del RDD original y muestra las 8 primeras ocurrencias
"""


'\nEjercicio\nCalcula los cuadrados de todos los números del RDD original y muestra las 8 primeras ocurrencias\n'

## Obteniendo una parte aleatoria del RDD

In [23]:
rdd_fraction = rdd.sample(withReplacement=True,fraction=0.1)
rdd_fraction.take(10)

[1, 5, 5, 11, 26, 31, 31, 37, 38]

In [24]:
"""
Ejercicio
Obten aproximadamente el 5% de elementos del RDD sin que puedan repetirse aleatoriamente 
"""




'\nEjercicio\nObten aproximadamente el 5% de elementos del RDD sin que puedan repetirse aleatoriamente \n'

## Reduciendo por clave

In [25]:
rdd_groupby_remainder_of_seven = [(x % 7, x) for x in range(1,101)]
rdd_groupby_remainder_of_seven = sc.parallelize(
    rdd_groupby_remainder_of_seven
)
rdd_groupby_remainder_of_seven.take(7)

[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (0, 7)]

In [26]:
rdd_groupby_sum = rdd_groupby_remainder_of_seven.reduceByKey(
    lambda accu, value: accu+value
)
[element for element in rdd_groupby_sum.collect()]

[(1, 750), (2, 765), (3, 679), (4, 693), (5, 707), (6, 721), (0, 735)]

In [27]:
"""
Ejercicio
Haz el mismo proceso anterior, 
pero ahora con los restos de 11 desde 100 hasta 200 
y usando la multiplicación en vez de la suma en la reducción por clave
"""



'\nEjercicio\nHaz el mismo proceso anterior, \npero ahora con los restos de 11 desde 100 hasta 200 \ny usando la multiplicación en vez de la suma en la reducción por clave\n'

## Reduciendo el RDD a un único valor

In [28]:
mult_value = rdd.fold(1,
    lambda accu, value: accu*value
)
mult_value

93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000

In [29]:
"""
Ejercicio
Obten la suma de todos los elementos del RDD
Pista: El valor final debe ser 5050
"""





'\nEjercicio\nObten la suma de todos los elementos del RDD\nPista: El valor final debe ser 5050\n'

## Ejercicio final

In [62]:
from random import choice, randint
from string import ascii_lowercase as letters
data = [
    (f"{choice(letters)}{choice(letters)}", randint(1,10)) for _ in range(10000)
]
data[:10]

[('jt', 8),
 ('gf', 9),
 ('qb', 7),
 ('bo', 3),
 ('hj', 9),
 ('np', 7),
 ('so', 8),
 ('wn', 1),
 ('aj', 6),
 ('vw', 4)]

In [63]:
"""
Ejercicio
En los datos anteriores eliminar todas las claves 
cuya primera letra vaya después alfabéticamente que la segunda letra.
Después dividir todos los valores entre la suma total de esos mismos valores.
Finalmente, calcular la suma de los valores agrupando por la clave
"""
data = sc.parallelize(data)
data = data.filter(lambda x: x[0][0] <= x[0][1])
sum_data = data.map(lambda x: x[1]).fold(0,lambda accu, value: accu + value)
data = data.map(lambda x: (x[0], x[1] / sum_data))
data = data.reduceByKey(lambda accu, value: accu + value)
data = data

data.take(10)

[('jt', 0.004597370878528841),
 ('bo', 0.003627612958839164),
 ('hj', 0.0018676819194023422),
 ('np', 0.003950865598735723),
 ('aj', 0.003412111198908124),
 ('vw', 0.002657855039149487),
 ('gh', 0.0030529415990230586),
 ('is', 0.0033761942389196175),
 ('er', 0.0029811076790460453),
 ('jm', 0.0011852596796207168)]