<font size=10>Práctica 1 de Computación de Altas Prestaciones</font>

# Introducción  
En este laboratorio, cubriremos los elementos básicos de la programación utilizando la metodología de *map-reduce*. Para ello, usaremos Apache Spark como referencia, pero ten en cuenta que existen *frameworks* similares y que los principios se pueden extrapolar.  

## Algunos conceptos y hechos  

- Apache Spark es una plataforma de computación distribuida que opera en un clúster. Al igual que MPI, esperamos que los nodos **NO** compartan un espacio de memoria, pero que estén conectados mediante una red dedicada de alta velocidad. Los sistemas de archivos distribuidos que funcionan sobre la red son extremadamente útiles.  

- Usaremos la siguiente generación del estándar previo de *map-reduce*, Apache Hadoop. La principal diferencia se cree que es el uso de la memoria en lugar del disco para operaciones intermedias, aunque existen muchas más mejoras. 

- Está construido sobre Java. A pesar de ello, puede programarse en Java, Scala, Python o R. La API completa solo se encuentra en lenguajes basados en la JVM, pero el más utilizado es PySpark, ya que la gente suele ser reacia a usar lenguajes basados en la JVM en ciencia de datos. De hecho, dado que Hadoop solo estaba disponible en Java, es probable que algunos códigos en Java de Spark sean adaptaciones de los códigos previos de Hadoop.  

- *Resilient Distributed Dataset* (RDD): la unidad básica que se procesa en Spark. Equivalente a un array de *numpy*, pero distribuido.  
- La API de RDDs generalmente expone las operaciones de bajo nivel de Apache Spark, útiles para el preprocesamiento de datos pero inútiles para el análisis de datos.  

- Para análisis de datos, se utilizan *DataFrame* y Spark SQL. Se basan en una API similar a *pandas* que incluso acepta código SQL (lo cual puede sonar extraño e inútil para desarrolladores, pero muchos científicos de datos y estadísticos *antiguos* son muy competentes en SQL pero no en Python).

- Existes otras APIs para datos en tiempo real (Spark Streaming) o aprendizaje automático (SparkML).


## Como usar en el clúster Apache Spark

El clúster ya cuenta con un pequeño clúster de Apache Spark desplegado, por lo que solo hay que instalar PySpark (cliente de Apache Spark en Python) y registrar nuestra aplicación. Se provee el siguiente *snippet* de código para tal propósito.

In [1]:
%pip install pyspark==4.0.0

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os

In [4]:
IP = !hostname -I | cut -d' ' -f1 
IP = IP[0]
IP

'192.168.50.66'

In [5]:
from pyspark.sql import SparkSession

APP_NAME = f"Tutorial de Spark {os.environ['JUPYTERHUB_USER']}" # Mantener siempre en el nombre de la app el nombre del usuario para fácil identificación
SPARK_URL = "spark://spark-master.spark.svc.cluster.local:7077"
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).config("spark.driver.host", IP) \
    .config("spark.executor.memory", "512M") \
    .config("spark.cores.max", 2) \
    .config("spark.executor.instances", 2) \
    .getOrCreate()
sc = spark.sparkContext

KeyError: 'JUPYTERHUB_USER'

In [7]:
assert sc.parallelize(range(100)).count() == 100 # Si falla, es que algo ha ido mal

                                                                                

# Parte : RDDs

## Operaciones básicas

### Parallelize & collect

*Parallize*: Crea un RDD a partir de una lista o un array.  
El segundo argumento indica el número de particiones del RDD. En Apache Spark suele ser frecuente que esto sea un parámetro o depende del origen de los datos (e.g. particiones de datos en el sistema de ficheros distribuidos)

In [8]:
array = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)
array

ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:297

In [9]:
import numpy as np
randomSamples = sc.parallelize(np.random.randn(10))
randomSamples

ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:297

Pero..., ¿por qué no se pueden imprimir?

Los RDDs no se pueden imprimir, pues *viven* en el clúster y este programa Python se ejecuta en el *driver*.

In [10]:
print(array.collect())
print(randomSamples.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[np.float64(-0.981484163969086), np.float64(-1.3481765770624816), np.float64(-0.1168714549235438), np.float64(0.8378759327600198), np.float64(0.4248611719774156), np.float64(-0.451793617163341), np.float64(1.9038670890811706), np.float64(-1.0184416744203308), np.float64(1.258220581284138), np.float64(1.4094470778980188)]


Spark utiliza operaciones *perezosas* (*lazy*) para todo, lo que significa que nada se evalúa hasta que se ejecuta una acción, normalmente una operación de reducción.  
La operación básica de reducción es `collect`, la cual devuelve el RDD completo (es decir, no se realiza ninguna reducción).  


### Otras maneras de cargar datos

Aquí puedes ver tanto un método para cargar un archivo de texto línea por línea como otra operación de reducción.

Nota: Como esto es un clúster, el fichero ha de estar distribuido...

In [11]:
quijote = sc.textFile("hdfs://listener-simple-hdfs-namenode-default-0.hdfs.svc.cluster.local/elquijote.txt")

In [12]:
quijote.take(2)

                                                                                

['DON QUIJOTE DE LA MANCHA', '']

In [13]:
# ¿Quieres ver la documentación en Python?
quijote.take?

[31mSignature:[39m quijote.take(num: int) -> List[~T]
[31mDocstring:[39m
Take the first num elements of the RDD.

It works by first scanning one partition, and use the results from
that partition to estimate the number of additional partitions needed
to satisfy the limit.

Translated from the Scala implementation in RDD#take().

.. versionadded:: 0.7.0

Parameters
----------
num : int
    first number of elements

Returns
-------
list
    the first `num` elements

See Also
--------
:meth:`RDD.first`
:meth:`pyspark.sql.DataFrame.take`

Notes
-----
This method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.

Examples
--------
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
>>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
[91, 92, 93]
[31mFile:[39m      /opt/conda/lib/python3.12/site-packages/pyspark/core/rdd.py
[31mType

### Transformaciones

Revisemos algunas transformaciones que se pueden hacer a RDDs.

In [14]:
charsPerLine = quijote.map(lambda s: len(s))
allWords = quijote.flatMap(lambda s: s.split())
allWordsNoArticles = allWords.filter(lambda a: a.lower() not in ["el", "la"])
allWordsUnique = allWords.map(lambda s: s.lower()).distinct()
sampleWords = allWords.sample(withReplacement=True, fraction=0.2, seed=666)
weirdSampling = sampleWords.union(allWordsNoArticles.sample(False, fraction=0.3))

In [15]:
allWordsUnique.take(10)

                                                                                

['ningunos',
 'vuestra',
 'lectura,',
 'pasaban',
 'hijo',
 'valor',
 'cartones',
 'y,',
 'nueva',
 'razón,']

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las operaciones anteriores.

Comenta también sobre el tamaño del RDD resultante en relación con el tamaño del RDD original.  
Por ejemplo, si el RDD original tiene tamaño $N$, entonces `rdd.filter()` tendrá tamaño $K \leq N$.  


--------
Respuesta:
- map
- flatMap
- filter
- distinct
- sample
- union

# map

Uso/Propósito: aplica una función a cada elemento del RDD, devolviendo un nuevo RDD con el mismo número de elementos transformados.
Tamaño: si el RDD original tiene tamaño N, el resultado también tendrá tamaño N (una salida por cada entrada).

# flatMap

Uso/Propósito: aplica una función que devuelve listas/iterables y luego “aplana” los resultados.
Tamaño: si el RDD original tiene tamaño N, el resultado puede tener más, igual o menos de N, dependiendo de cuántos elementos genere cada entrada.

# filter

Uso/Propósito: selecciona solo los elementos que cumplen una condición.
Tamaño: si el RDD original tiene tamaño N, el resultado tendrá tamaño K ≤ N.

# distinct

Uso/Propósito: elimina duplicados de un RDD.
Tamaño: si el RDD original tiene tamaño N, el resultado tendrá tamaño M ≤ N, donde M es el número de elementos únicos.

# sample

Uso/Propósito: devuelve una muestra aleatoria del RDD. Se puede hacer con reemplazo (withReplacement=True) o sin reemplazo.
Tamaño: si el RDD original tiene tamaño N, el resultado tendrá un tamaño esperado de N × fraction (aproximado, ya que es aleatorio).

# union

Uso/Propósito: combina dos RDDs y devuelve un RDD que contiene todos los elementos de ambos.
Tamaño: si los RDDs tienen tamaño N y M, el resultado tendrá tamaño N + M (los duplicados no se eliminan).

# RESUMEN

map → N
flatMap → ≥ 0, variable respecto a N
filter → K ≤ N
distinct → M ≤ N
sample → ≈ N × fraction
union → N + M

----

 ### Acciones



In [6]:
numLines = quijote.count()
numChars = charsPerLine.reduce(lambda a,b: a+b) # also charsPerLine.sum()
sortedWordsByLength = allWordsNoArticles.takeOrdered(10, key=lambda x: -len(x))
numLines, numChars, sortedWordsByLength

NameError: name 'quijote' is not defined

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las transformaciones anteriores.

¿Cómo contarías los elementos de un RDD utilizando únicamente `map` y/o `reduce`?

--------
Respuesta:

count() → cuenta elementos en el RDD.

reduce() → combina todos los elementos con una función (suma en este caso).

takeOrdered() → extrae elementos ordenados según un criterio, aquí las palabras más largas.

Y para sustituir la opración count() con map y reduce hacemos:

count_rdd = rdd.map(lambda _: 1)                        # convierte cada elemento en un "1"

num_elements = count_rdd.reduce(lambda a, b: a + b)     # suma todos los "1"




----

## Key-Value RDDs

In [7]:
import re
import requests
allWords = allWords.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.lower()).split(" ")).filter(lambda a: len(a)>0)
allWords2 = sc.parallelize(requests.get("https://gist.githubusercontent.com/jsdario/9d871ed773c81bf217f57d1db2d2503f/raw/585de69b0631c805dabc6280506717943b82ba4a/el_quijote_ii.txt").iter_lines())
allWords2 = allWords2.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.decode("utf8").lower()).split(" ")).filter(lambda a: len(a)>0)

  allWords = allWords.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.lower()).split(" ")).filter(lambda a: len(a)>0)
  allWords2 = allWords2.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.decode("utf8").lower()).split(" ")).filter(lambda a: len(a)>0)
  allWords = allWords.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.lower()).split(" ")).filter(lambda a: len(a)>0)
  allWords2 = allWords2.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.decode("utf8").lower()).split(" ")).filter(lambda a: len(a)>0)


NameError: name 'allWords' is not defined

In [8]:
allWords.take(10), allWords2.take(10)

NameError: name 'allWords' is not defined

A continuación, pasamos a operaciones más interesantes que involucran RDDs de tipo clave-valor.  
Los RDDs clave-valor son un tipo especial de RDD en los que cada elemento es una tupla `(K, V)`,  
donde `K` es la clave y `V` el valor.  


In [None]:
words = allWords.map(lambda e: (e,1))
words2 = allWords2.map(lambda e: (e,1))

words.take(10)

[('don', 1),
 ('quijote', 1),
 ('de', 1),
 ('la', 1),
 ('mancha', 1),
 ('miguel', 1),
 ('de', 1),
 ('cervantes', 1),
 ('saavedra', 1),
 ('primera', 1)]

### Cómo manipular RDDs clave-valor

In [None]:
frequencies = words.reduceByKey(lambda a,b: a+b)
frequencies2 = words2.reduceByKey(lambda a,b: a+b)
frequencies.takeOrdered(10, key=lambda a: -a[1])

                                                                                

[('que', 10705),
 ('de', 9033),
 ('y', 8668),
 ('la', 5015),
 ('a', 4815),
 ('en', 4046),
 ('el', 3857),
 ('no', 3083),
 ('se', 2382),
 ('los', 2148)]

In [None]:
res = words.groupByKey().takeOrdered(10, key=lambda a: -len(a))
res # To see the content, res[i][1].data

                                                                                

[('ningunos', <pyspark.resultiterable.ResultIterable at 0x7f85924b7680>),
 ('vuestra', <pyspark.resultiterable.ResultIterable at 0x7f85924b7830>),
 ('ello', <pyspark.resultiterable.ResultIterable at 0x7f85924b79e0>),
 ('pasaban', <pyspark.resultiterable.ResultIterable at 0x7f85924b79b0>),
 ('hijo', <pyspark.resultiterable.ResultIterable at 0x7f85924c9640>),
 ('morgante', <pyspark.resultiterable.ResultIterable at 0x7f85924c9760>),
 ('valor', <pyspark.resultiterable.ResultIterable at 0x7f85924c8cb0>),
 ('rincón', <pyspark.resultiterable.ResultIterable at 0x7f85924c8c50>),
 ('cartones', <pyspark.resultiterable.ResultIterable at 0x7f85924ca2a0>),
 ('nueva', <pyspark.resultiterable.ResultIterable at 0x7f85924c97f0>)]

In [None]:
joinFreq = frequencies.join(frequencies2)
joinFreq.take(10)

                                                                                

[('pasaban', (14, 3)),
 ('liberalidad', (11, 9)),
 ('alcancé', (1, 2)),
 ('suben', (2, 2)),
 ('dificultoso', (5, 3)),
 ('mí?', (1, 2)),
 ('deleite', (2, 1)),
 ('enteros', (3, 2)),
 ('alentar', (1, 1)),
 ('enamorado', (38, 19))]

In [None]:
joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1]))).takeOrdered(10, lambda v: -v[1]), joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1]))).takeOrdered(10, lambda v: +v[1])

                                                                                

([('bacía', 0.9393939393939394),
  ('venia', 0.9230769230769231),
  ('hermandad', 0.9),
  ('micomicona', 0.8823529411764706),
  ('peña', 0.8823529411764706),
  ('andrés', 0.8823529411764706),
  ('barca', 0.875),
  ('yerme', 0.875),
  ('novela', 0.875),
  ('acertó', 0.8666666666666667)],
 [('teresa', -0.9767441860465116),
  ('roque', -0.96),
  ('refranes', -0.9375),
  ('condesa', -0.9333333333333333),
  ('leones', -0.9333333333333333),
  ('gobernadores', -0.9166666666666666),
  ('lacayo', -0.9166666666666666),
  ('visorrey', -0.9130434782608695),
  ('antonio', -0.9076923076923077),
  ('zaragoza', -0.9047619047619048)])

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las funciones usadas con RDDs clave-valor.

¿Cuáles son acciones y cuáles transformaciones?

¿Qué hace la celda anterior?

--------
**Respuesta:**

1. **Funciones usadas y su propósito**:

   * `join(frequencies2)` → **transformación**: une dos RDDs por clave, generando `(clave, (valor1, valor2))`.
   * `map(lambda e: (e[0], (e[1][0]-e[1][1])/(e[1][0]+e[1][1])))` → **transformación**: calcula la diferencia relativa entre las frecuencias de cada clave.
   * `takeOrdered(n, key=...)` → **acción**: devuelve los `n` elementos con mayor o menor valor según la clave de ordenamiento.

2. **Acciones vs transformaciones**:

   * Transformaciones: `join`, `map` → crean nuevos RDDs sin ejecutar nada todavía.
   * Acciones: `takeOrdered` → desencadena el cálculo y devuelve resultados al driver.

3. **Qué hace la celda**:

   * Une las frecuencias de dos textos por palabra.
   * Calcula la **diferencia relativa** de cada palabra entre ambos textos.
   * Devuelve dos listas:

     * Las 10 palabras más características del **primer texto** (valores positivos altos).
     * Las 10 palabras más características del **segundo texto** (valores negativos bajos).



------

In [None]:
joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1]))).takeOrdered(10, lambda v: -v[1]), joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1]))).takeOrdered(10, lambda v: +v[1])

                                                                                

([('bacía', 0.9393939393939394),
  ('venia', 0.9230769230769231),
  ('hermandad', 0.9),
  ('micomicona', 0.8823529411764706),
  ('peña', 0.8823529411764706),
  ('andrés', 0.8823529411764706),
  ('barca', 0.875),
  ('yerme', 0.875),
  ('novela', 0.875),
  ('acertó', 0.8666666666666667)],
 [('teresa', -0.9767441860465116),
  ('roque', -0.96),
  ('refranes', -0.9375),
  ('condesa', -0.9333333333333333),
  ('leones', -0.9333333333333333),
  ('gobernadores', -0.9166666666666666),
  ('lacayo', -0.9166666666666666),
  ('visorrey', -0.9130434782608695),
  ('antonio', -0.9076923076923077),
  ('zaragoza', -0.9047619047619048)])

In [None]:
result = joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1])))
result.takeOrdered(10, lambda v: -v[1]), result.takeOrdered(10, lambda v: +v[1])

                                                                                

([('bacía', 0.9393939393939394),
  ('venia', 0.9230769230769231),
  ('hermandad', 0.9),
  ('micomicona', 0.8823529411764706),
  ('peña', 0.8823529411764706),
  ('andrés', 0.8823529411764706),
  ('barca', 0.875),
  ('yerme', 0.875),
  ('novela', 0.875),
  ('acertó', 0.8666666666666667)],
 [('teresa', -0.9767441860465116),
  ('roque', -0.96),
  ('refranes', -0.9375),
  ('condesa', -0.9333333333333333),
  ('leones', -0.9333333333333333),
  ('gobernadores', -0.9166666666666666),
  ('lacayo', -0.9166666666666666),
  ('visorrey', -0.9130434782608695),
  ('antonio', -0.9076923076923077),
  ('zaragoza', -0.9047619047619048)])

## Optimizaciones y notas finales de RDDs

### Movimientos de datos

Uno de los principales problemas podría ser que, si los datos después de una operación no están balanceados, es posible que no estemos utilizando el clúster de forma adecuada.  
Para ese propósito, disponemos de dos operaciones.



In [None]:
coalesced_result = result.coalesce(numPartitions=2) # Avoids the data movement, so it tries to balance inside each machine
repartitioned_result = result.repartition(numPartitions=2) # We don't care about data movement, this balance the whole thing to ensure all machines are used

### Persistencia
A diferencia de Hadoop, los RDDs intermedios no se preservan;  
cada vez que usamos una acción/reducción, toda el DAG se ejecuta desde las fuentes de datos.  
Para evitar esto, podemos usar `cache`o `persist` en puntos intermedios del grafo.



In [None]:
words = allWords.cache() # Best effort, le decimos a Spark que intente guardar este resultado intermedio con mayor prioridad.
words.map(lambda e: (e,1))
words.map(lambda e: (e,-1))

PythonRDD[58] at RDD at PythonRDD.scala:56

In [None]:
from pyspark import StorageLevel
# https://spark.apache.org/docs/4.0.0/rdd-programming-guide.html#rdd-persistence
allWords2.persist(StorageLevel.MEMORY_AND_DISK) # Añadimos la persistencia en disco

PythonRDD[59] at RDD at PythonRDD.scala:56

----

### Variables globales

There are two kind of global variables, read-only and write-only.There are two kind of global variables, read-only and write-only.

In [None]:
articles = sc.broadcast(["el", "la"])
articles.value

['el', 'la']

Las variables *broadcast* son de solo lectura.  
Nos ayudan a evitar que las variables locales de los *closures*
(las funciones que usamos dentro de `map`, `reduce`, ...)  
se transfieran en cada operación de Spark.  
De esta manera, se transfieren **solo una vez**.


In [None]:
acc = sc.accumulator(0)
def incrementar(x):
  global acc
  acc += x

allWords.map(lambda l:1).foreach(incrementar)
acc

                                                                                

Accumulator<id=0, value=187045>

Las variables de solo escritura también pueden declararse e inicializarse,  
pero no pueden ser leídas, ya que leerlas forzaría una **sincronización completa** del clúster.  


# API de alto nivel: Spark SQL

Ahora veremos el API de alto nivel, de DataFrames, de Apache Spark.

In [37]:
df = spark.read.parquet("hdfs://listener-simple-hdfs-namenode-default-0.hdfs.svc.cluster.local/reddit_02_2016.parquet")

                                                                                

In [38]:
df.show()

+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|             author|created_utc|               title|           subreddit|            selftext|over_18|
+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|          hunkdaddy| 1454345157|EA's brilliant au...|                FIFA|                    |  false|
|       randomboner2| 1454345157|Science question:...|                 gay|           [removed]|   true|
|      Silver_Dynamo| 1454345157|You catch your SO...|           AskReddit|                    |  false|
|          [deleted]| 1454345157|[F]reckles and li...|            gonewild|           [deleted]|   true|
|          CedarWolf| 1454345157|overview for Ital...|                spam|                    |  false|
|          [deleted]| 1454345157|ONLINE BOOK "Bath...|    earthnewschannel|    Kristen Upchurch|  false|
|        prothirteen| 1454345157|Brantford's first...| 

## Operaciones básicas

In [39]:
df = df.withColumn("over_18tmp", df.over_18.try_cast("boolean")) \
    .drop("over_18") \
    .withColumnRenamed("over_18tmp", "over_18")
df = df.cache()
df.show()

[Stage 53:>                                                         (0 + 1) / 1]

+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|             author|created_utc|               title|           subreddit|            selftext|over_18|
+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|          hunkdaddy| 1454345157|EA's brilliant au...|                FIFA|                    |  false|
|       randomboner2| 1454345157|Science question:...|                 gay|           [removed]|   true|
|      Silver_Dynamo| 1454345157|You catch your SO...|           AskReddit|                    |  false|
|          [deleted]| 1454345157|[F]reckles and li...|            gonewild|           [deleted]|   true|
|          CedarWolf| 1454345157|overview for Ital...|                spam|                    |  false|
|          [deleted]| 1454345157|ONLINE BOOK "Bath...|    earthnewschannel|    Kristen Upchurch|  false|
|        prothirteen| 1454345157|Brantford's first...| 

                                                                                

### Filtrado

In [40]:
df.filter(~df.over_18).show()

+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|             author|created_utc|               title|           subreddit|            selftext|over_18|
+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|          hunkdaddy| 1454345157|EA's brilliant au...|                FIFA|                    |  false|
|      Silver_Dynamo| 1454345157|You catch your SO...|           AskReddit|                    |  false|
|          CedarWolf| 1454345157|overview for Ital...|                spam|                    |  false|
|          [deleted]| 1454345157|ONLINE BOOK "Bath...|    earthnewschannel|    Kristen Upchurch|  false|
|        prothirteen| 1454345157|Brantford's first...|           brantford|                    |  false|
|      alpha_pharaoh| 1454345158|     BAPE X CONVERSE|           bapeheads|                    |  false|
|          [deleted]| 1454345158|Why did the Vikin...| 

In [41]:
df.where(~df.over_18).show()

+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|             author|created_utc|               title|           subreddit|            selftext|over_18|
+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|          hunkdaddy| 1454345157|EA's brilliant au...|                FIFA|                    |  false|
|      Silver_Dynamo| 1454345157|You catch your SO...|           AskReddit|                    |  false|
|          CedarWolf| 1454345157|overview for Ital...|                spam|                    |  false|
|          [deleted]| 1454345157|ONLINE BOOK "Bath...|    earthnewschannel|    Kristen Upchurch|  false|
|        prothirteen| 1454345157|Brantford's first...|           brantford|                    |  false|
|      alpha_pharaoh| 1454345158|     BAPE X CONVERSE|           bapeheads|                    |  false|
|          [deleted]| 1454345158|Why did the Vikin...| 

In [42]:
df.where("not over_18").show() # SQL syntax

+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|             author|created_utc|               title|           subreddit|            selftext|over_18|
+-------------------+-----------+--------------------+--------------------+--------------------+-------+
|          hunkdaddy| 1454345157|EA's brilliant au...|                FIFA|                    |  false|
|      Silver_Dynamo| 1454345157|You catch your SO...|           AskReddit|                    |  false|
|          CedarWolf| 1454345157|overview for Ital...|                spam|                    |  false|
|          [deleted]| 1454345157|ONLINE BOOK "Bath...|    earthnewschannel|    Kristen Upchurch|  false|
|        prothirteen| 1454345157|Brantford's first...|           brantford|                    |  false|
|      alpha_pharaoh| 1454345158|     BAPE X CONVERSE|           bapeheads|                    |  false|
|          [deleted]| 1454345158|Why did the Vikin...| 

### Operaciones



In [43]:
df.select(df.created_utc * 2).show()

+-----------------+
|(created_utc * 2)|
+-----------------+
|       2908690314|
|       2908690314|
|       2908690314|
|       2908690314|
|       2908690314|
|       2908690314|
|       2908690314|
|       2908690316|
|       2908690316|
|       2908690316|
|       2908690316|
|       2908690316|
|       2908690318|
|       2908690318|
|       2908690320|
|       2908690320|
|       2908690320|
|       2908690320|
|       2908690322|
|       2908690322|
+-----------------+
only showing top 20 rows


In [44]:
from pyspark.sql.functions import log
df.select(log(df.created_utc * 2)).show()

+---------------------+
|ln((created_utc * 2))|
+---------------------+
|   21.790968752903122|
|   21.790968752903122|
|   21.790968752903122|
|   21.790968752903122|
|   21.790968752903122|
|   21.790968752903122|
|   21.790968752903122|
|   21.790968753590718|
|   21.790968753590718|
|   21.790968753590718|
|   21.790968753590718|
|   21.790968753590718|
|   21.790968754278314|
|   21.790968754278314|
|   21.790968754965906|
|   21.790968754965906|
|   21.790968754965906|
|   21.790968754965906|
|   21.790968755653502|
|   21.790968755653502|
+---------------------+
only showing top 20 rows


### Agregaciones



In [45]:
df.where("not over_18").groupby(["author", df.subreddit]).count().show()



+-----------------+-------------------+-----+
|           author|          subreddit|count|
+-----------------+-------------------+-----+
|            RPBot|        WarshipFans|   14|
|AamirSaleemAnsari|          Funnypics|    7|
|        morrone95|       malegrooming|    4|
|    ALL_WORM_DIET|          Shadowrun|    1|
|   johnreddit0344|             creepy|    1|
|      Nipplecreek|SandersForPresident|    3|
|    Tayto_Penguin|    northernireland|    1|
|            skknt|        techsupport|    1|
|        [deleted]|             leaves|    4|
|         Szyphoid|                ATT|    1|
|  FLHNorthAmerica|          giveaways|    3|
|   phromadistance|    TwoXChromosomes|    1|
|           Enkrod|      dwarffortress|    1|
|      royzainocor|           DaiMinty|    1|
|    NiceDragon520|        CaseClicker|    1|
|     Spaceman_Jim|       Peterborough|    1|
|    andrewhenry10|     DestinyTheGame|    1|
|       notyourtoy|              proED|    1|
|        Spasticon|             Tw

                                                                                

### Funciones definidas por el usuario (UDFs)

In [46]:
from pyspark.sql.functions import length

df = df.withColumn("length", length(df.selftext)) # This adds a column

df.where("length > 1000").toPandas()

Unnamed: 0,author,created_utc,title,subreddit,selftext,over_18,length
0,scamcop,1454345171,[Item Fraud] TMP - Sheik - 76561198024330229,csgoscammers,##**[Item Fraud] Quick-switch trade scam or at...,False,1399
1,lodejohhsign,1454345175,~HD Kino~ Вспомнить всё 2012 смотреть онлайн в...,WeebSquad420,# **Вспомнить всё 2012 смотреть онлайн в высок...,False,2266
2,Audax-,1454345211,Chaos knight as a mid hero?,DotA2,"Hi,\n\nI would like to discuss the viablity of...",False,3110
3,BigMik_PL,1454345213,The Division BETA is a proof of how much we su...,gaming,Seriously we need to stop. \n\nThere is so lit...,False,1126
4,mairouu,1454345226,"[STORE] Two M9 Marbles, 1 0.002FV M9 Doppler F...",GlobalOffensiveTrade,#M9 Bayonet | Marble Fade (FN)\n\n* 0.028FV - ...,True,1087
...,...,...,...,...,...,...,...
9809,tealness28,1454327036,"It's a boy, but no celebrating just yet...",BabyBumps,So because I'm 35 and considered of advanced m...,False,1105
9810,Grimsrasatoas,1454327043,I'm finally beginning to be able to play the s...,Guitar,"Pretty self explanatory, but I'm finally able ...",False,1796
9811,Ghillieintehmist101,1454327102,Request : Help with Ember Spirit,TrueDoTA2,Hi to anyone that reads this post! \n\nI wante...,False,1077
9812,mykkenny,1454327121,The questions we all want answered before we c...,thedivision,Pretty sure this covers the main gripes:\n\n1)...,False,1211


In [47]:
from pyspark.sql.functions import udf

def splitWords(e):
  return e.split(" ") if e else e

splitWords = udf(splitWords)
df.select(splitWords(df.selftext)).show()

[Stage 64:>                                                         (0 + 1) / 1]

+--------------------+
|splitWords(selftext)|
+--------------------+
|                    |
|         [[removed]]|
|                    |
|         [[deleted]]|
|                    |
| [Kristen, Upchurch]|
|                    |
|                    |
|[What, exactly, w...|
|[I, have, noticed...|
|         [[removed]]|
|                    |
|[Ok, so, this, ac...|
|         [[deleted]]|
|[Selling, my, MW,...|
|      [Comment, PSN]|
|[New, to, the, su...|
|[So, I've, been, ...|
|                    |
|                    |
+--------------------+
only showing top 20 rows


                                                                                

-----

## SQL

### Como declarar vistas de Dataframes

Todo dataframe puede ser usado como una tabla SQL y una sintaxis similar a la usada en otras asignaturas de bases de datos. De hecho, esta es la base del proyecto Apache Hive.

In [48]:
df.createOrReplaceTempView("reddit")

In [49]:
spark.sql("select * from reddit limit 10").show()

+---------------+-----------+--------------------+----------------+--------------------+-------+------+
|         author|created_utc|               title|       subreddit|            selftext|over_18|length|
+---------------+-----------+--------------------+----------------+--------------------+-------+------+
|      hunkdaddy| 1454345157|EA's brilliant au...|            FIFA|                    |  false|     0|
|   randomboner2| 1454345157|Science question:...|             gay|           [removed]|   true|     9|
|  Silver_Dynamo| 1454345157|You catch your SO...|       AskReddit|                    |  false|     0|
|      [deleted]| 1454345157|[F]reckles and li...|        gonewild|           [deleted]|   true|     9|
|      CedarWolf| 1454345157|overview for Ital...|            spam|                    |  false|     0|
|      [deleted]| 1454345157|ONLINE BOOK "Bath...|earthnewschannel|    Kristen Upchurch|  false|    16|
|    prothirteen| 1454345157|Brantford's first...|       brantfo

--------


<font size=10 color=red>Ejercicio</font>

Obten los usuarios que han subido a reddit más de 10 posts de más de 100 carácteres.

--------
Respuesta:

In [51]:
spark.sql("select author, count(*) as c from reddit group by author order by c desc").toPandas()

Unnamed: 0,author,c
0,[deleted],37035
1,rotoreuters,1017
2,ImaBlue,989
3,AutoModerator,773
4,RPBot,663
...,...,...
105391,Baedui,1
105392,aspenluxuryrental,1
105393,Niruk199,1
105394,LampshadeThis,1


----