# SparkSQL

In [1]:
spark

## Creación de DataFrames

In [2]:
l = [('a',3.14), ('b', 9.4), ('a',2.7)]
headers = ['id','value']
df = spark.createDataFrame(l, headers)
df.show()
df.printSchema()

+---+-----+
| id|value|
+---+-----+
|  a| 3.14|
|  b|  9.4|
|  a|  2.7|
+---+-----+

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)



In [3]:
l = [('a',3.14), ('b',True)]
headers = ['id','value']
## df = spark.createDataFrame(l, headers) 
## Esta invocación lanzaría una excepción 
## TypeError: field value: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.BooleanType'>

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

l = [('a',3.14), ('b', 9.4), (None, 5.7)]
schema = StructType([
  StructField('id',   StringType(), True),
  StructField('value', FloatType(), False)
])
df = spark.createDataFrame(l, schema)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- value: float (nullable = false)



In [5]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

l = [('a',3.14), ('b', 9.4), ('a', True)]
schema = StructType([
  StructField('id',   StringType(), True),
  StructField('value', FloatType(), False)
])
## df = spark.createDataFrame(l, schema) ## Lanza excepción
## TypeError: field value: FloatType can not accept object True in type <class 'bool'>

In [6]:
r = sc.parallelize([('a',3.14), ('b', 9.4), ('a', 2.7)])
df = spark.createDataFrame(r, ['id','value'])
df.show()
df.printSchema()

+---+-----+
| id|value|
+---+-----+
|  a| 3.14|
|  b|  9.4|
|  a|  2.7|
+---+-----+

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)



In [7]:
# Creación de un DataFrame a partir de un RDD
import csv

raw = (
  sc.textFile("../../data/Cap7/titanic.csv") # Leemos el fichero de texto
    .map(lambda s: list(csv.reader([s]))[0]) # Dividimos el CSV en listas
    .filter(lambda l: l[0] != 'PassengerId') # Eliminamos la cabecera
)

df = spark.createDataFrame(raw)
df.show(3)
df.printSchema()
headers = ['PassengerId','Survived','Pclass','Name','Sex','Age','SibSp','Parch','Ticket','Fare','Cabin','Embarked']
df = spark.createDataFrame(raw, headers)
df.show(3)
df.printSchema()

+---+---+---+--------------------+------+---+---+---+----------------+-------+---+---+
| _1| _2| _3|                  _4|    _5| _6| _7| _8|              _9|    _10|_11|_12|
+---+---+---+--------------------+------+---+---+---+----------------+-------+---+---+
|  1|  0|  3|Braund, Mr. Owen ...|  male| 22|  1|  0|       A/5 21171|   7.25|   |  S|
|  2|  1|  1|Cumings, Mrs. Joh...|female| 38|  1|  0|        PC 17599|71.2833|C85|  C|
|  3|  1|  3|Heikkinen, Miss. ...|female| 26|  0|  0|STON/O2. 3101282|  7.925|   |  S|
+---+---+---+--------------------+------+---+---+---+----------------+-------+---+---+
only showing top 3 rows

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: string (nullable = true)
 |-- _7: string (nullable = true)
 |-- _8: string (nullable = true)
 |-- _9: string (nullable = true)
 |-- _10: string (nullable = true)
 |-- _11: string (nu

## Lectura y escritura de DataFrames desde ficheros

In [8]:
df = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [9]:
df = spark.read.json('../../data/Cap7/tweets.json')
df.printSchema()
df.show()

root
 |-- RT_count: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- verified: boolean (nullable = true)

+--------+--------------+----------------+
|RT_count|          text|            user|
+--------+--------------+----------------+
|       2|   #Tengosueño| [3, Pepe, true]|
|      45|  #VivaElLunes|[15, Ana, false]|
|     100|¡Gol de Señor!|  [2, Eva, true]|
+--------+--------------+----------------+



In [10]:
## Lee todos los ficheros, y cada línea es una fila del DataFrame.
## El DataFrame tiene una única columna 'value' de tipo string
df = spark.read.text('../../data/Cap7/*.txt')
df.printSchema()
df.show()

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|En el anterior ap...|
|y que se crean me...|
|antes de poder co...|
|un RDD (o varios)...|
|Estos RDDs se cre...|
|o a partir de fic...|
|creará un RDD y s...|
|diferentes proces...|
|En el anterior ap...|
|y que se crean me...|
|antes de poder co...|
|un RDD (o varios)...|
|Estos RDDs se cre...|
|o a partir de fic...|
|creará un RDD y s...|
|diferentes proces...|
+--------------------+



In [11]:
## Lee todos los ficheros, y cada ficheros es una fila del DataFrame
## El DataFrame tiene una única columna 'value' de tipo string
df = spark.read.text('../../data/Cap7/*.txt', wholetext=True)
df.printSchema()
df.show()

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|En el anterior ap...|
|En el anterior ap...|
+--------------------+



In [12]:
# Escritura de un DataFrame de 50 elementos en la carpeta /tmp/csv
l = [('a',3.14)] * 50
df = spark.createDataFrame(l, ['id','value'])
df.printSchema()
df.write.csv('/tmp/csv', header=True, mode='overwrite')

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)



In [13]:
# Lectura del DataFrame anterior desde la carpeta /tmp/csv
df = spark.read.csv('/tmp/csv', header=True, inferSchema=True)
df.printSchema()
df.show(4)
df.count()

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)

+---+-----+
| id|value|
+---+-----+
|  a| 3.14|
|  a| 3.14|
|  a| 3.14|
|  a| 3.14|
+---+-----+
only showing top 4 rows



50

In [14]:
l = [('a',3.14)] * 50
df = spark.createDataFrame(l, ['id','value'])
df.printSchema()
print(df.rdd.getNumPartitions())
df.write.json('/tmp/json', mode='overwrite')

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)

8


In [15]:
# Lectura del DataFrame anterior desde la carpeta /tmp/json
df = spark.read.json('/tmp/json')
df.printSchema()
df.show(4)
df.count()

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)

+---+-----+
| id|value|
+---+-----+
|  a| 3.14|
|  a| 3.14|
|  a| 3.14|
|  a| 3.14|
+---+-----+
only showing top 4 rows



50

In [16]:
# Volcar un DataFrame a Pandas
l = [('a',3.14)] * 12
df = spark.createDataFrame(l, ['id','value'])
df.toPandas()

Unnamed: 0,id,value
0,a,3.14
1,a,3.14
2,a,3.14
3,a,3.14
4,a,3.14
5,a,3.14
6,a,3.14
7,a,3.14
8,a,3.14
9,a,3.14


## DataFrames y MongoDB

Para poder cargar y salvar DataFrames en MongoDB es necesario lanzar ```pyspark``` configurando el conector de Mongo-Spark. Para ello es necesario pasar el siguiente parámetro a la hora de invocar a ```pyspark```:
```
pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2 --master local[*]
```
        
El nombre del conector corresponde a _coordenadas_ Maven:
 * **2.11** porque el conector utiliza la versión 2.11 de Scala
 * **2.2.2** es la versión concreta del conector. La versión 2.2.2 soporta MongoDB 2.2.x y 2.3.x
 
El parámetro ```--master``` es el usual. En este caso lanzamos ```pyspark``` en modo local y utilizando tantos procesos _workers_ como núcleos tenga nuestra CPU.

Se puede encontrar más información en:
 * https://docs.mongodb.com/spark-connector/master/
 * https://docs.mongodb.com/spark-connector/master/python-api/
 
**Supondremos que existe un servidor MongoDB ejecutándose en local (IP 127.0.0.1) y escuchando en el puerto por defecto (27017).**

In [17]:
# Volcar un DataFrame a una colección MongoDB
ciudades = spark.createDataFrame([("Madrid",  3182981), ("Barcelona", 1620809), ("Valencia",787808),
                                  ("Sevilla", 689434), ("Zaragoza", 664938), ("Málaga",569002), 
                                  ("Murcia",443243), ("Palma",406492) ], ["nombre", "habitantes"])
ciudades.show()
ciudades.printSchema()

(ciudades.write.format("com.mongodb.spark.sql.DefaultSource")
              .option("uri","mongodb://127.0.0.1/test.ciudades").save()
)
# *Importante*: La colección test.ciudades no debe existir. Si lo que queremos es añadir a una colección ya existente, debemos usar mode("append"):
### ciudades.write.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/test.ciudades").mode("append").save()

+---------+----------+
|   nombre|habitantes|
+---------+----------+
|   Madrid|   3182981|
|Barcelona|   1620809|
| Valencia|    787808|
|  Sevilla|    689434|
| Zaragoza|    664938|
|   Málaga|    569002|
|   Murcia|    443243|
|    Palma|    406492|
+---------+----------+

root
 |-- nombre: string (nullable = true)
 |-- habitantes: long (nullable = true)



In [18]:
# Cargar un DataFrame desde una colección MongoDB
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/test.ciudades").load()
df.show()
df.printSchema()

+--------------------+----------+---------+
|                 _id|habitantes|   nombre|
+--------------------+----------+---------+
|[5baa6d4ac68b5f61...|   1620809|Barcelona|
|[5baa6d4ac68b5f61...|    664938| Zaragoza|
|[5baa6d4ac68b5f61...|    787808| Valencia|
|[5baa6d4ac68b5f61...|    689434|  Sevilla|
|[5baa6d4ac68b5f61...|    569002|   Málaga|
|[5baa6d4ac68b5f61...|    443243|   Murcia|
|[5baa6d4ac68b5f61...|    406492|    Palma|
|[5baa6d4ac68b5f61...|   3182981|   Madrid|
+--------------------+----------+---------+

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- habitantes: long (nullable = true)
 |-- nombre: string (nullable = true)



In [19]:
# Cargar un DataFrame desde una colección MongoDB heterogénea (distintos documentos tienen atributos diferentes)
from pymongo import MongoClient

# Borramos la colección test.usuarios e insertamos dos documentos heterogéneos
client = MongoClient('127.0.0.1')
col = client['test']['usuarios']
col.drop()
col.insert_many([{'nombre':'ana', 'edad':33}, {'nombre':'pedro','altura':150}])

# Cargamos un DataFrame a partir de la colección test.usuarios
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/test.usuarios").load()
df.show()
df.printSchema()

+--------------------+------+----+------+
|                 _id|altura|edad|nombre|
+--------------------+------+----+------+
|[5baa6d4a09fcae61...|  null|  33|   ana|
|[5baa6d4a09fcae61...|   150|null| pedro|
+--------------------+------+----+------+

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- altura: integer (nullable = true)
 |-- edad: integer (nullable = true)
 |-- nombre: string (nullable = true)



In [20]:
pipeline = "{'$match': {'habitantes': {$gt:500000}}}"
masde500mil = (spark.read.format("com.mongodb.spark.sql.DefaultSource")
                        .option("uri","mongodb://127.0.0.1/test.ciudades")
                        .option("pipeline", pipeline).load()
              )
masde500mil.show()

+--------------------+----------+---------+
|                 _id|habitantes|   nombre|
+--------------------+----------+---------+
|[5baa6d4ac68b5f61...|   1620809|Barcelona|
|[5baa6d4ac68b5f61...|    664938| Zaragoza|
|[5baa6d4ac68b5f61...|    787808| Valencia|
|[5baa6d4ac68b5f61...|    689434|  Sevilla|
|[5baa6d4ac68b5f61...|    569002|   Málaga|
|[5baa6d4ac68b5f61...|   3182981|   Madrid|
+--------------------+----------+---------+



## Inspección de DataFrames

In [21]:
# 
l = [('a',3.14)] * 12
df = spark.createDataFrame(l, ['id','value'])
df.describe().show()
type(df.printSchema())

+-------+----+-----+
|summary|  id|value|
+-------+----+-----+
|  count|  12|   12|
|   mean|null| 3.14|
| stddev|null|  0.0|
|    min|   a| 3.14|
|    max|   a| 3.14|
+-------+----+-----+

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)



NoneType

In [22]:
l = [('a',3.14), ('b',2.0), ('c',4.5)]
df = spark.createDataFrame(l, ['id','value'])
df.count()

3

In [23]:
df = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
df.describe(['Age','Fare','Sex','Cabin']).show()

+-------+------------------+-----------------+------+-----+
|summary|               Age|             Fare|   Sex|Cabin|
+-------+------------------+-----------------+------+-----+
|  count|               714|              891|   891|  204|
|   mean| 29.69911764705882| 32.2042079685746|  null| null|
| stddev|14.526497332334035|49.69342859718089|  null| null|
|    min|              0.42|              0.0|female|  A10|
|    max|              80.0|         512.3292|  male|    T|
+-------+------------------+-----------------+------+-----+



## Filtrado de DataFrames

In [24]:
## Eliminación de columnas con drop()
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
print(titanic.columns)
df2 = titanic.drop('PassengerId','Name','Cabin')
print(df2.columns)
df3 = titanic.drop(titanic.PassengerId)
print(df3.columns)
df4 = titanic.drop(titanic['PassengerId'])
print(df4.columns)

['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']
['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Embarked']
['Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']
['Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']


In [25]:
## Selección de columnas con select()
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
print(titanic.columns)
df2 = titanic.select('Survived', 'Pclass', 'Age')
print(df2.columns)

['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']
['Survived', 'Pclass', 'Age']


In [26]:
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
print(titanic.count())
df = titanic.dropDuplicates()
print(df.count())
df = titanic.dropDuplicates(['Sex'])
print(df.count())

891
891
2


In [27]:
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
print(titanic.count())
df = titanic.dropna()
print(df.count())
df = titanic.drop('Cabin').dropna()
print(df.count())

891
183
712


In [28]:
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
df = titanic.filter( 'Survived = 1')
print(df.count())
df = titanic.filter( df.Survived == 1)
print(df.count())

342
342


In [29]:
df = titanic.filter( 'Survived = 1 AND Sex = "female" AND Age > 20')
print(df.count())
df = titanic.filter( (df.Survived == 1) & (df['Sex'] == 'female') & (df.Age > 20))
print(df.count())

144
144


In [30]:
# Filas cuyo camarote tiene 3 caracteres, el primero es una A o una B mayúsculas y el último un 2
# Usa una expresión regular
df = titanic.filter('Cabin RLIKE "^[AB].2$"')
print(df.count())
df = titanic.filter(df.Cabin.rlike('^[AB].2$'))
print(df.count())

4
4


## Combinación de DataFrames

In [31]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df2 = spark.createDataFrame([(3,'marta'),(1,'ana')],['id','nombre'])
df = df1.union(df2)
df.show()

+---+------+
| id|nombre|
+---+------+
|  1|   ana|
|  2|  jose|
|  3| marta|
|  1|   ana|
+---+------+



In [32]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df1.printSchema()
df2 = spark.createDataFrame([(3.0,'marta'),(1.0,'ana')],['id','nombre'])
df2.printSchema()
df = df1.union(df2) ## No hay problema: long y double son compatibles => double
df.show()
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)

root
 |-- id: double (nullable = true)
 |-- nombre: string (nullable = true)

+---+------+
| id|nombre|
+---+------+
|1.0|   ana|
|2.0|  jose|
|3.0| marta|
|1.0|   ana|
+---+------+

root
 |-- id: double (nullable = true)
 |-- nombre: string (nullable = true)



In [33]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df1.printSchema()
df2 = spark.createDataFrame([('3','marta'),('1','ana')],['id','nombre'])
df2.printSchema()
df = df1.union(df2) ## No hay problema: long y string son compatibles => string
df.show()
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- nombre: string (nullable = true)

+---+------+
| id|nombre|
+---+------+
|  1|   ana|
|  2|  jose|
|  3| marta|
|  1|   ana|
+---+------+

root
 |-- id: string (nullable = true)
 |-- nombre: string (nullable = true)



In [34]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df1.printSchema()
df2 = spark.createDataFrame([(True,'marta'),(False,'ana')],['id','nombre'])
df2.printSchema()
## df = df1.union(df2) ## Excepcion
##AnalysisException: "Union can only be performed on tables with the compatible column types. boolean <> bigint at the first column of the second table

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)

root
 |-- id: boolean (nullable = true)
 |-- nombre: string (nullable = true)



In [35]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df1.printSchema()
df2 = spark.createDataFrame([(3,'marta',33),(4,'señor',44)],['id','nombre','edad'])
df2.printSchema()
## df = df1.union(df2)  ## Excepción
## AnalysisException: "Union can only be performed on tables with the same number of columns, but the first table has 2 columns and the second table has 3 columns

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: long (nullable = true)



In [36]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df2 = spark.createDataFrame([(3,'marta'),(1,'ana')],['id','nombre'])
df = df1.intersect(df2)
df.show()

+---+------+
| id|nombre|
+---+------+
|  1|   ana|
+---+------+



In [37]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df2 = spark.createDataFrame([(3,'marta'),(1,'ana')],['id','nombre'])
df = df1.subtract(df2)
df.show()

+---+------+
| id|nombre|
+---+------+
|  2|  jose|
+---+------+



In [38]:
## inner join de dos DataFrames sobre la columna 'id'
users = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
users.show()
age = spark.createDataFrame([(1,36),(2,30)],['id','edad'])
age.show()
df = users.join(age,'id')
df.show()

+---+------+
| id|nombre|
+---+------+
|  1|   ana|
|  2|  jose|
+---+------+

+---+----+
| id|edad|
+---+----+
|  1|  36|
|  2|  30|
+---+----+

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  1|   ana|  36|
|  2|  jose|  30|
+---+------+----+



In [39]:
## Inner join donde la columna 'id' contiene enteros y cadenas de texto
users = spark.createDataFrame([('1','ana'),('2','jose')],['id','nombre'])
users.printSchema()
age = spark.createDataFrame([(1,36),(2,30)],['id','edad'])
age.printSchema()
df = users.join(age,'id') ## No hay problema: string y long son compatibles => string
df.printSchema()
df.show()

root
 |-- id: string (nullable = true)
 |-- nombre: string (nullable = true)

root
 |-- id: long (nullable = true)
 |-- edad: long (nullable = true)

root
 |-- id: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: long (nullable = true)

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  1|   ana|  36|
|  2|  jose|  30|
+---+------+----+



In [40]:
## inner join considerando igualdad de varias columnas a la vez
users = spark.createDataFrame([(1,'ana','golf'),(2,'jose','polo',)],['id','nombre','deporte'])
users.show()
age = spark.createDataFrame([(1,'eva',33),(2,'jose',30)],['id','nombre','edad'])
age.show()
df = users.join(age,["id", "nombre"])
df.show()

+---+------+-------+
| id|nombre|deporte|
+---+------+-------+
|  1|   ana|   golf|
|  2|  jose|   polo|
+---+------+-------+

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  1|   eva|  33|
|  2|  jose|  30|
+---+------+----+

+---+------+-------+----+
| id|nombre|deporte|edad|
+---+------+-------+----+
|  2|  jose|   polo|  30|
+---+------+-------+----+



In [41]:
## inner join usando una expresión de igualdad entre 2 columnas
users = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
users.show()
age = spark.createDataFrame([(1,36),(2,30)],['ident','edad'])
age.show()
df = users.join(age,users.id == age.ident)
df.show()

+---+------+
| id|nombre|
+---+------+
|  1|   ana|
|  2|  jose|
+---+------+

+-----+----+
|ident|edad|
+-----+----+
|    1|  36|
|    2|  30|
+-----+----+

+---+------+-----+----+
| id|nombre|ident|edad|
+---+------+-----+----+
|  1|   ana|    1|  36|
|  2|  jose|    2|  30|
+---+------+-----+----+



In [42]:
## inner join usando una expresión de igualdad entre 4 columnas
users = spark.createDataFrame([(1,'ana','golf'),(2,'jose','polo',)],['id','name','sport'])
users.show()
age = spark.createDataFrame([(1,'eva',33),(2,'jose',30)],['ident','nombre','edad'])
age.show()
df = users.join(age,(users.id == age.ident) & (users.name == age.nombre))
df.show()

+---+----+-----+
| id|name|sport|
+---+----+-----+
|  1| ana| golf|
|  2|jose| polo|
+---+----+-----+

+-----+------+----+
|ident|nombre|edad|
+-----+------+----+
|    1|   eva|  33|
|    2|  jose|  30|
+-----+------+----+

+---+----+-----+-----+------+----+
| id|name|sport|ident|nombre|edad|
+---+----+-----+-----+------+----+
|  2|jose| polo|    2|  jose|  30|
+---+----+-----+-----+------+----+



In [43]:
## inner join usando una expresión compleja entre 2 columnas
users = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
users.show()
age = spark.createDataFrame([(None,1,33),(2,5,30)],['id1','id2','edad'])
age.show()
cond = ((age.id1.isNotNull() & (users.id == age.id1)) | 
       (age.id1.isNull() & (users.id == age.id2)))
df = users.join(age,cond)
df.show()

+---+------+
| id|nombre|
+---+------+
|  1|   ana|
|  2|  jose|
+---+------+

+----+---+----+
| id1|id2|edad|
+----+---+----+
|null|  1|  33|
|   2|  5|  30|
+----+---+----+

+---+------+----+---+----+
| id|nombre| id1|id2|edad|
+---+------+----+---+----+
|  1|   ana|null|  1|  33|
|  2|  jose|   2|  5|  30|
+---+------+----+---+----+



In [44]:
## left outer join de users y age
users = spark.createDataFrame([(1,'ana'),(2,'jose'),(3,'eva')],['id','nombre'])
users.show()
age = spark.createDataFrame([(1,36),(2,30)],['id','edad'])
age.show()
df = users.join(age,'id','left_outer')
df.show()

+---+------+
| id|nombre|
+---+------+
|  1|   ana|
|  2|  jose|
|  3|   eva|
+---+------+

+---+----+
| id|edad|
+---+----+
|  1|  36|
|  2|  30|
+---+----+

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  1|   ana|  36|
|  3|   eva|null|
|  2|  jose|  30|
+---+------+----+



## Transformación de DataFrames

In [45]:
def load_titanic():
    return spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True).drop('Cabin').dropna()

In [46]:
# Operaciones aritméticas entre columnas
titanic = load_titanic()
titanic.selectExpr("Survived","SibSp + Parch As Family", "Age * 12 AS Age").show(3)

+--------+------+-----+
|Survived|Family|  Age|
+--------+------+-----+
|       0|     1|264.0|
|       1|     1|456.0|
|       1|     0|312.0|
+--------+------+-----+
only showing top 3 rows



In [47]:
# Funciones definidas por el usuario (UDF) para transformar columnas
titanic = load_titanic()

def sex_to_num(s):
    ret = None
    if s == 'female':
        ret = 0
    elif s == 'male':
        ret = 1
    return ret

from pyspark.sql.types import IntegerType
spark.udf.register("sex_to_num", sex_to_num, IntegerType())

titanic.selectExpr("Sex", "sex_to_num(Sex) AS Sex_num").show(3)

+------+-------+
|   Sex|Sex_num|
+------+-------+
|  male|      1|
|female|      0|
|female|      0|
+------+-------+
only showing top 3 rows



In [48]:
# Funciones definidas por el usuario (UDF) para transformar columnas
titanic = load_titanic()

spark.udf.register("max_int", max, IntegerType())
    
titanic.selectExpr("SibSp", "Parch", "max_int(SibSp, Parch) AS Max_Family").show(3)

+-----+-----+----------+
|SibSp|Parch|Max_Family|
+-----+-----+----------+
|    1|    0|         1|
|    1|    0|         1|
|    0|    0|         0|
+-----+-----+----------+
only showing top 3 rows



In [49]:
# Funciones definidas por el usuario (UDF) para transformar columnas
titanic = load_titanic()

def scale(n,minv,maxv):
    return (n - minv) / (maxv - minv)

# Se puede obtener los valores minimos y maximos de cada columna a través del DataFrame generado por describe()
summary = titanic.describe().toPandas()
min_age = float(summary.loc[3,'Age'])
max_age = float(summary.loc[4,'Age'])

from pyspark.sql.types import DoubleType
spark.udf.register("scale_Age", lambda x: scale(x, min_age, max_age), DoubleType())

titanic.selectExpr("Age", "scale_Age(Age) AS Scaled_Age").show(3)

+----+-------------------+
| Age|         Scaled_Age|
+----+-------------------+
|22.0| 0.2711736617240513|
|38.0| 0.4722292033174164|
|26.0|0.32143754712239253|
+----+-------------------+
only showing top 3 rows



In [50]:
# Transformación de columnas usando expresiones entre columnas
titanic = load_titanic()
titanic.select(titanic.Survived,(titanic.SibSp + titanic.Parch).alias("Family"), (titanic.Age * 12).alias("Age")).show(3)

+--------+------+-----+
|Survived|Family|  Age|
+--------+------+-----+
|       0|     1|264.0|
|       1|     1|456.0|
|       1|     0|312.0|
+--------+------+-----+
only showing top 3 rows



In [51]:
# Transformaciones con UDFs usando expresiones entre columnas
from pyspark.sql.functions import udf
titanic = load_titanic()

sex_to_num_UDF = udf(sex_to_num, IntegerType())

max_int_UDF = udf(max, IntegerType())

summary = titanic.describe().toPandas()
min_age = float(summary.loc[3,'Age'])
max_age = float(summary.loc[4,'Age'])
scale_Age_UDF = udf(lambda x : scale(x, min_age, max_age), DoubleType())
                   
titanic.select(scale_Age_UDF(titanic.Age).alias("Scaled_Age"), 
               sex_to_num_UDF(titanic.Sex).alias("Sex_Num"),
               max_int_UDF(titanic.SibSp,titanic.Parch).alias("Max_Family")).show(3)

+-------------------+-------+----------+
|         Scaled_Age|Sex_Num|Max_Family|
+-------------------+-------+----------+
| 0.2711736617240513|      1|         1|
| 0.4722292033174164|      0|         1|
|0.32143754712239253|      0|         0|
+-------------------+-------+----------+
only showing top 3 rows



In [52]:
# Una única agregación
titanic = load_titanic()

titanic.groupBy().count().show()
titanic.groupBy().sum('Survived').show()
titanic.groupBy('Pclass').sum('Survived').show()
titanic.groupBy('Pclass','Embarked').sum('Survived').show()

+-----+
|count|
+-----+
|  712|
+-----+

+-------------+
|sum(Survived)|
+-------------+
|          288|
+-------------+

+------+-------------+
|Pclass|sum(Survived)|
+------+-------------+
|     1|          120|
|     3|           85|
|     2|           83|
+------+-------------+

+------+--------+-------------+
|Pclass|Embarked|sum(Survived)|
+------+--------+-------------+
|     3|       C|           18|
|     2|       C|            8|
|     1|       Q|            1|
|     3|       Q|            6|
|     2|       Q|            1|
|     1|       C|           53|
|     1|       S|           66|
|     3|       S|           61|
|     2|       S|           74|
+------+--------+-------------+



In [53]:
# Varias agregaciones del mismo tipo
titanic = load_titanic()

titanic.groupBy('Pclass').sum('Survived', 'Fare').show()

+------+-------------+------------------+
|Pclass|sum(Survived)|         sum(Fare)|
+------+-------------+------------------+
|     1|          120| 16200.85429999999|
|     3|           85| 4696.449500000006|
|     2|           83|3714.5791999999997|
+------+-------------+------------------+



In [54]:
# Varias funciones de agregación diferentes a la vez
titanic = load_titanic()

# Usando diccionarios
titanic.groupBy('Pclass').agg({'*':'count', 'Survived':'sum'}).show()

# Usando una secuencia de funciones de la biblioteca pyspark.sql.functions
from pyspark.sql import functions
titanic.groupBy('Pclass').agg(functions.count('*').alias('Total'), functions.sum('Survived').alias('Survivors')).show()


+------+-------------+--------+
|Pclass|sum(Survived)|count(1)|
+------+-------------+--------+
|     1|          120|     184|
|     3|           85|     355|
|     2|           83|     173|
+------+-------------+--------+

+------+-----+---------+
|Pclass|Total|Survivors|
+------+-----+---------+
|     1|  184|      120|
|     3|  355|       85|
|     2|  173|       83|
+------+-----+---------+



## SQL sobre DataFrames

In [55]:
# Ejecución de código SQL que combina DataFrames
users = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
users.createOrReplaceTempView("users")
age = spark.createDataFrame([(1,36),(2,30)],['id','edad'])
age.createOrReplaceTempView("age")

spark.sql("""SELECT *
             FROM users INNER JOIN age ON users.id == age.id""").show()

+---+------+---+----+
| id|nombre| id|edad|
+---+------+---+----+
|  1|   ana|  1|  36|
|  2|  jose|  2|  30|
+---+------+---+----+



In [56]:
titanic = load_titanic()
titanic.createOrReplaceTempView("titanic")
spark.sql("""SELECT Survived, SibSp+Parch AS Family, sex_to_num(Sex) AS Sex_Num
             FROM titanic 
             WHERE Age > 50
             """).show(3)

+--------+------+-------+
|Survived|Family|Sex_Num|
+--------+------+-------+
|       0|     0|      1|
|       1|     0|      0|
|       1|     0|      0|
+--------+------+-------+
only showing top 3 rows



In [57]:
# Ejecución de código SQL con agregación y ordenación
titanic = load_titanic()
titanic.createOrReplaceTempView("titanic")
spark.sql("""SELECT Pclass, 
                    COUNT(*) AS Total, 
                    SUM(Survived) AS Survivors 
             FROM titanic 
             GROUP BY Pclass
             ORDER BY Pclass ASC
             """).show()

+------+-----+---------+
|Pclass|Total|Survivors|
+------+-----+---------+
|     1|  184|      120|
|     2|  173|       83|
|     3|  355|       85|
+------+-----+---------+



# SparkML

## Clasificación con LinearSVC usando un Pipeline

In [58]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline

# Partimos en train (80%) + test (20%)
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
titanic = titanic.drop('PassengerId', 'Name', 'Ticket','Cabin')
titanic = titanic.dropna()

train, test = titanic.randomSplit([0.8, 0.2])

# Transforma Sex y Age a valores numéricos (orden alfabético)
indexerSex = StringIndexer(inputCol='Sex', outputCol='Sex_num', stringOrderType='alphabetAsc')
indexerEmbarked = StringIndexer(inputCol='Embarked', outputCol='Embarked_num', stringOrderType='alphabetAsc')

ohe = OneHotEncoderEstimator(inputCols=['Embarked_num'],outputCols=['Embarked_OHE'])
vec = VectorAssembler(inputCols=['Pclass', 'Sex_num', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked_OHE'], outputCol='features_raw')
sca = MinMaxScaler(inputCol='features_raw', outputCol='features')
clf = LinearSVC(featuresCol='features', labelCol='Survived')

pipeline = Pipeline(stages=[indexerSex, indexerEmbarked, ohe, vec, sca, clf])


# Entrenamos el pipeline y lo usamos para clasificar 'test'
model = pipeline.fit(train)
prediction = model.transform(test)
results = prediction.select('prediction', 'Survived')
results.show(5)


# Evaluación de la clasificación usando la clase experimental MulticlassClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
claseval = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='Survived', metricName='accuracy')
print('Score:', claseval.evaluate(prediction))


# Evaluación de la regresión utilizando mllib (obsolescente)
from pyspark.mllib.evaluation import MulticlassMetrics

rdd = results.rdd.map(lambda row: (row[0], float(row[1]))) # Es necesario representar la clase Survived como float
                                                           # o MulticlassMetrics fallará
metrics = MulticlassMetrics(rdd)
print("Score:", metrics.accuracy)

+----------+--------+
|prediction|Survived|
+----------+--------+
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
+----------+--------+
only showing top 5 rows

Score: 0.7480916030534351
Score: 0.7480916030534351


## Regresión con LinearRegression usando un Pipeline

In [59]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Partimos en train (80%) + test (20%)
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
titanic = titanic.drop('PassengerId', 'Name', 'Ticket','Cabin')
titanic = titanic.dropna()

train, test = titanic.randomSplit([0.8, 0.2])

# Transforma Sex y Age a valores numéricos (orden alfabético)
indexerSex = StringIndexer(inputCol='Sex', outputCol='Sex_num', stringOrderType='alphabetAsc')
indexerEmbarked = StringIndexer(inputCol='Embarked', outputCol='Embarked_num', stringOrderType='alphabetAsc')

ohe = OneHotEncoderEstimator(inputCols=['Embarked_num'],outputCols=['Embarked_OHE'])
vec = VectorAssembler(inputCols=['Pclass', 'Sex_num', 'Age', 'SibSp', 'Parch', 'Survived', 'Embarked_OHE'], outputCol='features_raw')
sca = MinMaxScaler(inputCol='features_raw', outputCol='features')
reg = LinearRegression(featuresCol='features', labelCol='Fare')

pipeline = Pipeline(stages=[indexerSex, indexerEmbarked, ohe, vec, sca, reg])


# Entrenamos el pipeline y lo usamos para inferir tarifas a partir de las instancias en 'test'
model = pipeline.fit(train)
prediction = model.transform(test)
results = prediction.select('Prediction', 'Fare')
results.show(5)

# Evaluación utilizando la clase experimental RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

maeeval = RegressionEvaluator(predictionCol='Prediction', labelCol='Fare', metricName='mae')
print("MAE :", maeeval.evaluate(results))
mseeval = RegressionEvaluator(predictionCol='Prediction', labelCol='Fare', metricName='mse')
print("MSE :", mseeval.evaluate(results))
rmseeval = RegressionEvaluator(predictionCol='Prediction', labelCol='Fare', metricName='rmse')
print("RMSE:", rmseeval.evaluate(results),'\n')

# Evaluación de la regresión utilizando mllib (obsolescente)
from pyspark.mllib.evaluation import RegressionMetrics

rdd = results.rdd
metrics = RegressionMetrics(rdd)
print("MAE :", metrics.meanAbsoluteError)
print("MSE :", metrics.meanSquaredError)
print("RMSE:", metrics.rootMeanSquaredError)

+------------------+--------+
|        Prediction|    Fare|
+------------------+--------+
| 77.19787404192037| 77.2875|
| 62.41948632392477|     5.0|
|60.089804041297455|    52.0|
|  99.9158323353225|110.8833|
| 94.36434230867826| 61.3792|
+------------------+--------+
only showing top 5 rows

MAE : 24.07650925132509
MSE : 2166.7990121545654
RMSE: 46.54888840944073 

MAE : 24.07650925132509
MSE : 2166.7990121545654
RMSE: 46.54888840944073


## Análisis de grupos con k-means usando un Pipeline

In [60]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator

# Descartamos las columnas que no nos interesan y eliminamos filas con valores vacíos
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
titanic = titanic.drop('PassengerId', 'Name', 'Ticket','Cabin')
titanic = titanic.dropna()

# Transforma Sex y Age a valores numéricos (orden alfabético)
indexerSex = StringIndexer(inputCol='Sex', outputCol='Sex_num', stringOrderType='alphabetAsc')
indexerEmbarked = StringIndexer(inputCol='Embarked', outputCol='Embarked_num', stringOrderType='alphabetAsc')
# One-hot-encoding de la columna Embarked
ohe = OneHotEncoderEstimator(inputCols=['Embarked_num'],outputCols=['Embarked_OHE'])
# Combina todas las columnas en un único vector
vec = VectorAssembler(inputCols=['Pclass', 'Sex_num', 'Age', 'SibSp', 'Parch', 'Survived', 'Embarked_OHE','Fare'], outputCol='features_raw')
# Normaliza los valores de cada posición del vector
sca = MinMaxScaler(inputCol='features_raw', outputCol='features')
# Realiza análisis de grupos
clu = KMeans(k=3) # valores por defecto: atributos en 'features' y centroide en 'prediction'


pipeline = Pipeline(stages=[indexerSex, indexerEmbarked, ohe, vec, sca, clu])


# Entrenamos el pipeline
model = pipeline.fit(titanic)
prediction = model.transform(titanic)
prediction.select('prediction').show(5)

# Obtenemos los centroides desde el último modelo del pipiline
print('Centroides:')
print(type(model.stages[-1].clusterCenters()[0]))
for c in model.stages[-1].clusterCenters():
    print(c)

# Evaluación utilizando la clase experimental ClusteringEvaluator para obtener el coeficiente de silueta
evaluator = ClusteringEvaluator()
print('Silhouette Coefficient:', evaluator.evaluate(prediction))

# No existen métodos de evaluación de clústers en mllib

+----------+
|prediction|
+----------+
|         0|
|         2|
|         2|
|         2|
|         0|
+----------+
only showing top 5 rows

Centroides:
<class 'numpy.ndarray'>
[0.7575406  0.87006961 0.37345076 0.09791183 0.05800464 0.06960557
 0.11600928 0.         0.04299367]
[0.92592593 0.59259259 0.34540597 0.14814815 0.06790123 0.25925926
 0.         1.         0.03046664]
[0.35433071 0.24409449 0.35892125 0.10629921 0.09645669 0.98818898
 0.31496063 0.00393701 0.11293829]
Silhouette Coefficient: 0.5096854350632269


## Persistencia de modelos en Pipelines

In [61]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Partimos en train (80%) + test (20%)
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
titanic = titanic.drop('PassengerId', 'Name', 'Ticket','Cabin')
titanic = titanic.dropna()

train, test = titanic.randomSplit([0.8, 0.2])

# Transforma Sex y Age a valores numéricos (orden alfabético)
indexerSex = StringIndexer(inputCol='Sex', outputCol='Sex_num', stringOrderType='alphabetAsc')
indexerEmbarked = StringIndexer(inputCol='Embarked', outputCol='Embarked_num', stringOrderType='alphabetAsc')

ohe = OneHotEncoderEstimator(inputCols=['Embarked_num'],outputCols=['Embarked_OHE'])
vec = VectorAssembler(inputCols=['Pclass', 'Sex_num', 'Age', 'SibSp', 'Parch', 'Survived', 'Embarked_OHE'], outputCol='features_raw')
sca = MinMaxScaler(inputCol='features_raw', outputCol='features')
reg = LinearRegression(featuresCol='features', labelCol='Fare')

pipeline = Pipeline(stages=[indexerSex, indexerEmbarked, ohe, vec, sca, reg])

# Entrenamos el pipeline y lo usamos para inferir tarifas a partir de las instancias en 'test'
model = pipeline.fit(train)
prediction = model.transform(test)
results = prediction.select('Prediction', 'Fare')
results.show(5)

# Volcado del modelo
model.save('../../data/Cap7/regression_model') # Equivalente a write().save(path)
#model.write().overwrite().save('../../data/Cap7/regression_model')

+-----------------+--------+
|       Prediction|    Fare|
+-----------------+--------+
|94.93821195351214|247.5208|
|63.96965715934703|    47.1|
| 63.5952441835162| 50.4958|
| 69.5737043575346|    52.0|
|68.82487840587294|    53.1|
+-----------------+--------+
only showing top 5 rows



In [62]:
from pyspark.ml import PipelineModel

# Carga del modelo
loaded_model = PipelineModel.load('../../data/Cap7/regression_model')

prediction = loaded_model.transform(test)
results = prediction.select('Prediction', 'Fare')
results.show(5)

+-----------------+--------+
|       Prediction|    Fare|
+-----------------+--------+
|94.93821195351214|247.5208|
|63.96965715934703|    47.1|
| 63.5952441835162| 50.4958|
| 69.5737043575346|    52.0|
|68.82487840587294|    53.1|
+-----------------+--------+
only showing top 5 rows

