<font size=10>Práctica 1 de Computación de Altas Prestaciones</font>

# Introducción  
En este laboratorio, cubriremos los elementos básicos de la programación utilizando la metodología de *map-reduce*. Para ello, usaremos Apache Spark como referencia, pero ten en cuenta que existen *frameworks* similares y que los principios se pueden extrapolar.  

## Algunos conceptos y hechos  

- Apache Spark es una plataforma de computación distribuida que opera en un clúster. Al igual que MPI, esperamos que los nodos **NO** compartan un espacio de memoria, pero que estén conectados mediante una red dedicada de alta velocidad. Los sistemas de archivos distribuidos que funcionan sobre la red son extremadamente útiles.  

- Usaremos la siguiente generación del estándar previo de *map-reduce*, Apache Hadoop. La principal diferencia se cree que es el uso de la memoria en lugar del disco para operaciones intermedias, aunque existen muchas más mejoras. 

- Está construido sobre Java. A pesar de ello, puede programarse en Java, Scala, Python o R. La API completa solo se encuentra en lenguajes basados en la JVM, pero el más utilizado es PySpark, ya que la gente suele ser reacia a usar lenguajes basados en la JVM en ciencia de datos. De hecho, dado que Hadoop solo estaba disponible en Java, es probable que algunos códigos en Java de Spark sean adaptaciones de los códigos previos de Hadoop.  

- *Resilient Distributed Dataset* (RDD): la unidad básica que se procesa en Spark. Equivalente a un array de *numpy*, pero distribuido.  
- La API de RDDs generalmente expone las operaciones de bajo nivel de Apache Spark, útiles para el preprocesamiento de datos pero inútiles para el análisis de datos.  

- Para análisis de datos, se utilizan *DataFrame* y Spark SQL. Se basan en una API similar a *pandas* que incluso acepta código SQL (lo cual puede sonar extraño e inútil para desarrolladores, pero muchos científicos de datos y estadísticos *antiguos* son muy competentes en SQL pero no en Python).

- Existes otras APIs para datos en tiempo real (Spark Streaming) o aprendizaje automático (SparkML).


## Como usar en el clúster Apache Spark

El clúster ya cuenta con un pequeño clúster de Apache Spark desplegado, por lo que solo hay que instalar PySpark (cliente de Apache Spark en Python) y registrar nuestra aplicación. Se provee el siguiente *snippet* de código para tal propósito.

In [2]:
%pip install pyspark==4.0.0

Note: you may need to restart the kernel to use updated packages.


In [3]:
import os

In [4]:
from pyspark.sql import SparkSession

IP = !hostname -I | cut -d' ' -f1 
APP_NAME = f"Tutorial de Spark {os.environ['JUPYTERHUB_USER']}" # Mantener siempre en el nombre de la app el nombre del usuario para fácil identificación
SPARK_URL = "spark://spark-master.spark.svc.cluster.local:7077"
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).config("spark.driver.host", IP[0]) \
    .config("spark.executor.memory", "512M") \
    .config("spark.cores.max", 2) \
    .config("spark.executor.instances", 2) \
    .getOrCreate()
sc = spark.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/08 12:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
assert sc.parallelize(range(100)).count() == 100 # Si falla, es que algo ha ido mal

                                                                                

# Parte : RDDs

## Operaciones básicas

### Parallelize & collect

*Parallize*: Crea un RDD a partir de una lista o un array.  
El segundo argumento indica el número de particiones del RDD. En Apache Spark suele ser frecuente que esto sea un parámetro o depende del origen de los datos (e.g. particiones de datos en el sistema de ficheros distribuidos)

In [6]:
array = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)
array

ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:297

In [7]:
import numpy as np
randomSamples = sc.parallelize(np.random.randn(10))
randomSamples

ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:297

Pero..., ¿por qué no se pueden imprimir?

Los RDDs no se pueden imprimir, pues *viven* en el clúster y este programa Python se ejecuta en el *driver*.

Spark utiliza operaciones *perezosas* (*lazy*) para todo, lo que significa que nada se evalúa hasta que se ejecuta una acción, normalmente una operación de reducción.  
La operación básica de reducción es `collect`, la cual devuelve el RDD completo (es decir, no se realiza ninguna reducción).  


### Otras maneras de cargar datos

Aquí puedes ver tanto un método para cargar un archivo de texto línea por línea como otra operación de reducción.

Nota: Como esto es un clúster, el fichero ha de estar distribuido...

In [9]:
quijote = sc.textFile("hdfs://listener-simple-hdfs-namenode-default-0.hdfs.svc.cluster.local/elquijote.txt")

### Transformaciones

Revisemos algunas transformaciones que se pueden hacer a RDDs.

In [64]:
# nos devuelve un nuevo conjunto de datos con la longitud de cada línea.
charsPerLine = quijote.map(lambda s: len(s))

# nos devuelve un nuevo conjunto de datos conformado por las palabras del conjunto de datos, el quijote, inicial.
allWords = quijote.flatMap(lambda s: s.split())

# parte del dataset \textit{allWords} y primero pasa las palabras a minúsculas y luego filtra por ``el'' y ``la''.
allWordsNoArticles = allWords.filter(lambda a: a.lower() not in ["el", "la"])

# utiliza también el dataset allWords. En este caso, pone las palabras en minúsculas y luego realiza la transformación distinct.
allWordsUnique = allWords.map(lambda s: s.lower()).distinct()

# realiza  una transformación sample sobre el dataset allWords con una frecuencia de 0.2 y con reemplazamiento.
sampleWords = allWords.sample(withReplacement=True, fraction=0.2, seed=666)

# realiza una unión entre el dataset obtenido antes con el sample y con un sample obtenido con el dataset allWordsNoArticles con 
# frecuencia 0.3 y sin reemplazamiento.
weirdSampling = sampleWords.union(allWordsNoArticles.sample(False, fraction=0.3))

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las operaciones anteriores.

Comenta también sobre el tamaño del RDD resultante en relación con el tamaño del RDD original.  
Por ejemplo, si el RDD original tiene tamaño $N$, entonces `rdd.filter()` tendrá tamaño $K \leq N$.  


--------
##### Respuesta:

Para comentar las operaciones anteriores, hemos puesto un comentario encima de cada línea, explicando lo que debe suceder tras la ejecución de las operaciones concretas, más allá de unicamente las transformaciones.

Además, comentamos ahora más en cooncreto sobre las transformaciones que nos especifica el enunciado:
- `map`: devuelve un nuevo conjunto de datos distribuido formado al pasar cada elemento de la fuente a través de una función *func*.
  Con respecto al tamaño resultante, `map` acaba con el tamaño original $N$.
- `flatmap`: función similar a `map`, pero cada elemento de entrada se puede asignar a 0 o más elementos de salida.
   Con respecto al tamaño resultante, `flatmap` puede acabar con cualquier tamaño, ya sea $K\leq N$ ó $K\geq N$.
- `filter`: devuelve un nuevo conjunto de datos formado al seleccionar aquellos elementos del conjunto de datos original en los que *func* devuelve `True`.
   En este caso, el tamaño del dataset de salida debe ser $K\leq N$.
- `distinct`: devuelve un nuevo conjunto de datos que contiene los distintos elementos del conjunto de datos original.
  La función `distinct` tiene como salida un conjunto de datos de tamaño $K\leq N$.
- `sample`: esta función muestrea una fracción de los datos, con o sin reemplazamiento, utilizando una semilla generadora de números aleatorios dada.
   En cuanto al tamaño, `sample` puede ser variable. El tamaño esperado será $N\times fraction$, donde $fraction$ es el argumento pasado a la función.
- `union`: devuelve un nuevo dataset que contiene la unión de los elementos en el dataset original y en el pasado por argumento.
  Por lo tanto, el tamaño del nuevo conjunto de datos será $N+M$, siendo $M$ el tamaño del conjunto de datos pasado por argumento.

----

 ### Acciones


In [23]:
# Nos cuenta el número de lineas del Quijote. 
# Como el dato principal de nuestro RDD son líneas, el count nos devuelve el número de elementos en el dataset.
numLines = quijote.count()

# Hace un reduce y obtiene el número de caracteres en el Quijote. Para ello, utiliza la función de reducción de suma 
# en el dataset de caracteres por línea.
numChars = charsPerLine.reduce(lambda a,b: a+b) # also charsPerLine.sum()

# Nos muestra las 10 palabras más largas del dataset allWordsNoArticles.
sortedWordsByLength = allWordsNoArticles.takeOrdered(10, key=lambda x: -len(x))

                                                                                

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las transformaciones anteriores.

¿Cómo contarías los elementos de un RDD utilizando únicamente `map` y/o `reduce`?

--------
##### Respuesta:

Tal y como hemos hecho en el ejercicio anterior, comentamos el funcionamiento de cada transformación/acción en la celda anterior, comentada delante de cada fragmento de código para facilitar la lectura del ejercicio.

Por otro lado, para contar los elementos de un RDD utilizando únicamente `map` y/o `reduce` lo hacemos de la siguiente manera:


In [None]:
# Utilizando map y reduce
count = rdd.map(lambda a: 1).reduce(lambda a,b: a+b)

# Utilizando solo reduce
count = rdd.reduce(lambda a,b: 1+1)

----

## Key-Value RDDs

In [21]:
import re
import requests

# Obtenemos un nuevo conjunto de datos donde quitamos los caracteres especificados en la función re.sub() por un espacio 
# en las palabras pasadas primero a minúsculas. Después de sustituirlo, vuelve a realizar el split para eliminar los espacios 
# y más tarde filtra para quedarse únicamente con las palabras que tienen longitud más que 0, es decir, eliminar los ''.
allWords = allWords.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.lower()).split(" ")).filter(lambda a: len(a)>0)

# Creamos un nuevo RDD con el Quijote II.
allWords2 = sc.parallelize(requests.get("https://gist.githubusercontent.com/jsdario/9d871ed773c81bf217f57d1db2d2503f/raw/585de69b0631c805dabc6280506717943b82ba4a/el_quijote_ii.txt").iter_lines())

# Realizamos exactamente lo mismo que hemos realizado en la línea 1 para el Quijote I para el Quijote II.
allWords2 = allWords2.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.decode("utf8").lower()).split(" ")).filter(lambda a: len(a)>0)

  allWords = allWords.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.lower()).split(" ")).filter(lambda a: len(a)>0)
  allWords2 = allWords2.flatMap(lambda w: re.sub(""";|:|\.|,|-|–|"|'|\s"""," ", w.decode("utf8").lower()).split(" ")).filter(lambda a: len(a)>0)


A continuación, pasamos a operaciones más interesantes que involucran RDDs de tipo clave-valor.  
Los RDDs clave-valor son un tipo especial de RDD en los que cada elemento es una tupla `(K, V)`,  
donde `K` es la clave y `V` el valor.  


In [22]:
# En las dos siguientes líneas creamos un RDD donde sustituimos un elemento del RDD por el par (elemento, 1).
words = allWords.map(lambda e: (e,1))
words2 = allWords2.map(lambda e: (e,1))

### Cómo manipular RDDs clave-valor

In [25]:
# Hace la operación reduceByKey, que nos permite realizar una función sobre los valores con las mismas claves. 
# En este caso, suma el número de veces que cierta palabra, la clave, aparece.
frequencies = words.reduceByKey(lambda a,b: a+b)
frequencies2 = words2.reduceByKey(lambda a,b: a+b)

PythonRDD[65] at RDD at PythonRDD.scala:56


In [28]:
res = words.groupByKey().takeOrdered(10, key=lambda a: -len(a))

                                                                                

In [18]:
# Juntamos las frecuencias de ambos libros Quijote I y Quijote II. El nuevo dataset contendrá pares del estilo (palabra, (freq~Q1, freq~Q2)).
joinFreq = frequencies.join(frequencies2)

In [32]:
# Calcula el ratio, o diferencia relativa, entre los porcentajes del primer Quijote y el segundo. Luego la primera línea obtiene las 
# 10 palabras con mayor ratio, mientras que la segunda, obtiene las 10 palabras de menor ratio
result = joinFreq.map(lambda e: (e[0], (e[1][0] - e[1][1])/(e[1][0] + e[1][1])))
freq_quijote1 = result.takeOrdered(10, lambda v: -v[1])
freq_quijote2 = result.takeOrdered(10, lambda v: +v[1])

                                                                                

--------


<font size=10 color=red>Ejercicio</font>

Explica el uso y propósito de las funciones usadas con RDDs clave-valor.

¿Cuáles son acciones y cuáles transformaciones?

¿Qué hace la celda anterior?

--------
##### Respuesta:

Como hemos hecho en los ejercicios anteriores, comentaremos el propósito y uso de las funciones justo encima de ellas como comentario para facilitar la lectura del ejercicio.

Ahora, especificamos cuales de ellas son acciones y cuales transformaciones:
- Acciones: `takeOrdered`
- Transformaciones: `map`, `flatmap`, `filter`, `join`, `reduceByKey`, `groupByKey`

------

## Optimizaciones y notas finales de RDDs

### Movimientos de datos

Uno de los principales problemas podría ser que, si los datos después de una operación no están balanceados, es posible que no estemos utilizando el clúster de forma adecuada.  
Para ese propósito, disponemos de dos operaciones.



In [33]:
coalesced_result = result.coalesce(numPartitions=2) # Avoids the data movement, so it tries to balance inside each machine
repartitioned_result = result.repartition(numPartitions=2) # We don't care about data movement, this balance the whole thing to ensure all machines are used

### Persistencia
A diferencia de Hadoop, los RDDs intermedios no se preservan;  
cada vez que usamos una acción/reducción, toda el DAG se ejecuta desde las fuentes de datos.  
Para evitar esto, podemos usar `cache`o `persist` en puntos intermedios del grafo.



In [34]:
words = allWords.cache() # Best effort, le decimos a Spark que intente guardar este resultado intermedio con mayor prioridad.
words.map(lambda e: (e,1))
words.map(lambda e: (e,-1))

PythonRDD[93] at RDD at PythonRDD.scala:56

In [28]:
from pyspark import StorageLevel
# https://spark.apache.org/docs/4.0.0/rdd-programming-guide.html#rdd-persistence
allWords2.persist(StorageLevel.MEMORY_AND_DISK) # Añadimos la persistencia en disco

PythonRDD[59] at RDD at PythonRDD.scala:56

----

### Variables globales

There are two kind of global variables, read-only and write-only.There are two kind of global variables, read-only and write-only.

In [29]:
articles = sc.broadcast(["el", "la"])
articles.value

['el', 'la']

Las variables *broadcast* son de solo lectura.  
Nos ayudan a evitar que las variables locales de los *closures*
(las funciones que usamos dentro de `map`, `reduce`, ...)  
se transfieran en cada operación de Spark.  
De esta manera, se transfieren **solo una vez**.


In [30]:
acc = sc.accumulator(0)
def incrementar(x):
  global acc
  acc += x

allWords.map(lambda l:1).foreach(incrementar)
acc

                                                                                

Accumulator<id=0, value=187045>

Las variables de solo escritura también pueden declararse e inicializarse,  
pero no pueden ser leídas, ya que leerlas forzaría una **sincronización completa** del clúster.  


# API de alto nivel: Spark SQL

Ahora veremos el API de alto nivel, de DataFrames, de Apache Spark.

In [36]:
df = spark.read.parquet("hdfs://listener-simple-hdfs-namenode-default-0.hdfs.svc.cluster.local/reddit_02_2016.parquet")

                                                                                

In [37]:
df.show()

[Stage 40:>                                                         (0 + 1) / 1]

+------------------+-----------+--------------------+--------------------+--------------------+-------+
|            author|created_utc|               title|           subreddit|            selftext|over_18|
+------------------+-----------+--------------------+--------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|         deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|     ElAndroideLibre|                    |  false|
|      chasdave1981| 1454326165|  Ms Juicy (retired)|            bigasses|                    |   true|
|         [deleted]| 1454326165|[H] ST MW Aquamar...|GlobalOffensiveTrade|           [deleted]|   true|
|       SableArgyle| 1454326166|Something might h...|            OnePiece|So waaaaaaaay bac...|   true|
|      nilimajaveri| 1454326166|Buy queen bed mat...| InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|           

                                                                                

## Operaciones básicas

In [38]:
df = df.withColumn("over_18tmp", df.over_18.try_cast("boolean")) \
    .drop("over_18") \
    .withColumnRenamed("over_18tmp", "over_18")
df = df.cache()
df.show()

[Stage 41:>                                                         (0 + 1) / 1]

+------------------+-----------+--------------------+--------------------+--------------------+-------+
|            author|created_utc|               title|           subreddit|            selftext|over_18|
+------------------+-----------+--------------------+--------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|         deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|     ElAndroideLibre|                    |  false|
|      chasdave1981| 1454326165|  Ms Juicy (retired)|            bigasses|                    |   true|
|         [deleted]| 1454326165|[H] ST MW Aquamar...|GlobalOffensiveTrade|           [deleted]|   true|
|       SableArgyle| 1454326166|Something might h...|            OnePiece|So waaaaaaaay bac...|   true|
|      nilimajaveri| 1454326166|Buy queen bed mat...| InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|           

                                                                                

### Filtrado

In [39]:
df.filter(~df.over_18).show()

+------------------+-----------+--------------------+-------------------+--------------------+-------+
|            author|created_utc|               title|          subreddit|            selftext|over_18|
+------------------+-----------+--------------------+-------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|        deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|    ElAndroideLibre|                    |  false|
|      nilimajaveri| 1454326166|Buy queen bed mat...|InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|               gifs|           [deleted]|  false|
|     a2zlifestyles| 1454326167|9 Cool Indian Hai...|          HairStyle|                    |  false|
|        dingdong89| 1454326169|Robert Leslie - I...|      BinauralMusic|                    |  false|
|      sleepyechoes| 1454326170|Presidential Foot...|                CFB|

In [40]:
df.where(~df.over_18).show()

+------------------+-----------+--------------------+-------------------+--------------------+-------+
|            author|created_utc|               title|          subreddit|            selftext|over_18|
+------------------+-----------+--------------------+-------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|        deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|    ElAndroideLibre|                    |  false|
|      nilimajaveri| 1454326166|Buy queen bed mat...|InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|               gifs|           [deleted]|  false|
|     a2zlifestyles| 1454326167|9 Cool Indian Hai...|          HairStyle|                    |  false|
|        dingdong89| 1454326169|Robert Leslie - I...|      BinauralMusic|                    |  false|
|      sleepyechoes| 1454326170|Presidential Foot...|                CFB|

In [41]:
df.where("not over_18").show() # SQL syntax

+------------------+-----------+--------------------+-------------------+--------------------+-------+
|            author|created_utc|               title|          subreddit|            selftext|over_18|
+------------------+-----------+--------------------+-------------------+--------------------+-------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|        deadbedroom|This post will be...|  false|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|    ElAndroideLibre|                    |  false|
|      nilimajaveri| 1454326166|Buy queen bed mat...|InternetIsBeautiful|                    |  false|
|         [deleted]| 1454326166|      Grand entrance|               gifs|           [deleted]|  false|
|     a2zlifestyles| 1454326167|9 Cool Indian Hai...|          HairStyle|                    |  false|
|        dingdong89| 1454326169|Robert Leslie - I...|      BinauralMusic|                    |  false|
|      sleepyechoes| 1454326170|Presidential Foot...|                CFB|

### Operaciones



In [42]:
df.select(df.created_utc * 2).show()

+-----------------+
|(created_utc * 2)|
+-----------------+
|       2908652328|
|       2908652330|
|       2908652330|
|       2908652330|
|       2908652332|
|       2908652332|
|       2908652332|
|       2908652334|
|       2908652338|
|       2908652340|
|       2908652342|
|       2908652342|
|       2908652342|
|       2908652344|
|       2908652344|
|       2908652346|
|       2908652346|
|       2908652346|
|       2908652348|
|       2908652348|
+-----------------+
only showing top 20 rows


In [43]:
from pyspark.sql.functions import log
df.select(log(df.created_utc * 2)).show()

+---------------------+
|ln((created_utc * 2))|
+---------------------+
|      21.790955693332|
|     21.7909556940196|
|     21.7909556940196|
|     21.7909556940196|
|   21.790955694707204|
|   21.790955694707204|
|   21.790955694707204|
|   21.790955695394807|
|   21.790955696770016|
|    21.79095569745762|
|   21.790955698145222|
|   21.790955698145222|
|   21.790955698145222|
|   21.790955698832825|
|   21.790955698832825|
|    21.79095569952043|
|    21.79095569952043|
|    21.79095569952043|
|   21.790955700208034|
|   21.790955700208034|
+---------------------+
only showing top 20 rows


### Agregaciones



In [44]:
df.where("not over_18").groupby(["author", df.subreddit]).count().show()



+-----------------+--------------------+-----+
|           author|           subreddit|count|
+-----------------+--------------------+-----+
|   truthfulcoffee|    AndroidQuestions|    1|
|     SuperSaguaro|   10cloverfieldlane|    1|
|        [deleted]|                 CFB|   14|
|       rafamorgan|          airconsole|    1|
|         jsntz007|fastloanunemployedau|    1|
|    AutoModerator|           madisonwi|    1|
|  KarthikVishwash|          technology|    1|
|  thecaptaincltch|     Thecaptaincltch|    1|
|       Born-Hater|             Sverige|    1|
|        [deleted]|              QuestU|    1|
|  jitendragroup01|            business|    2|
|AutonomyForbidden|              MURICA|    1|
|totalitarianghost|      AskProgramming|    1|
|  Gamesfreak13563|    totallynotrobots|    1|
|       fusyanasan|               otoge|    1|
|        [deleted]|                 WC3|    2|
|     Moonkaman227|     leagueoflegends|    1|
|        [deleted]|                GMAT|    1|
|          bu

                                                                                

### Funciones definidas por el usuario (UDFs)

In [45]:
from pyspark.sql.functions import length

df = df.withColumn("length", length(df.selftext)) # This adds a column

df.where("length > 1000").toPandas()

Unnamed: 0,author,created_utc,title,subreddit,selftext,over_18,length
0,BlairTheWiseViking,1454326164,(24M) with a DB problem that's going on three ...,deadbedroom,This post will be long as to give detail about...,False,3872
1,sleepyechoes,1454326170,Presidential Football Poll Results,CFB,I put together the results in probably the lea...,False,7831
2,Daedin_,1454326175,Looking for 3-4 rank 14-18 players to progress...,hearthstone,Hey reddit! Glad to meet you all :)\n\nI'm osc...,False,1669
3,Holo_of_Yoitsu,1454326188,[Spoilers] Diamond no Ace: Second Season - Epi...,anime,**Episode title:** To This Side \n**Episode d...,False,3278
4,SmellyFbuttface,1454326190,Why doesn't challenge mode change?,DestinyTheGame,I like that they integrated challenge mode. G...,False,1043
...,...,...,...,...,...,...,...
9809,-Boman-,1454326072,Warwick The Fast Ticket Out Of Bronze and Silv...,summonerschool,Hello my name is Boman and today i would like ...,False,3409
9810,GoldenJermbag,1454326080,I've finally had my first true high sodium mom...,DestinyTheGame,Before I go tell my woeful tale I will say I l...,False,1767
9811,BhiQ,1454326085,FtMs - your thoughts on the male gender role/s...,asktransgender,"Hey guys, I'll try to be as respectful as I ca...",False,1984
9812,SpaceyMoogle,1454326087,Freaking out a little about possible risk of p...,TwoXChromosomes,Not really sure if this is the right place. \n...,False,1196


In [46]:
from pyspark.sql.functions import udf

def splitWords(e):
  return e.split(" ") if e else e

splitWords = udf(splitWords)
df.select(splitWords(df.selftext)).show()

[Stage 52:>                                                         (0 + 1) / 1]

+--------------------+
|splitWords(selftext)|
+--------------------+
|[This, post, will...|
|                    |
|                    |
|         [[deleted]]|
|[So, waaaaaaaay, ...|
|                    |
|         [[deleted]]|
|                    |
|                    |
|[I, put, together...|
|                    |
|                    |
|                    |
|                    |
|         [[removed]]|
|                    |
|                    |
|                    |
|                    |
|                    |
+--------------------+
only showing top 20 rows


                                                                                

-----

## SQL

### Como declarar vistas de Dataframes

Todo dataframe puede ser usado como una tabla SQL y una sintaxis similar a la usada en otras asignaturas de bases de datos. De hecho, esta es la base del proyecto Apache Hive.

In [47]:
df.createOrReplaceTempView("reddit")

In [48]:
spark.sql("select * from reddit limit 10").show()

+------------------+-----------+--------------------+--------------------+--------------------+-------+------+
|            author|created_utc|               title|           subreddit|            selftext|over_18|length|
+------------------+-----------+--------------------+--------------------+--------------------+-------+------+
|BlairTheWiseViking| 1454326164|(24M) with a DB p...|         deadbedroom|This post will be...|  false|  3872|
|        DAVIDSPZGZ| 1454326165|EAL: Rebajas Sony...|     ElAndroideLibre|                    |  false|     0|
|      chasdave1981| 1454326165|  Ms Juicy (retired)|            bigasses|                    |   true|     0|
|         [deleted]| 1454326165|[H] ST MW Aquamar...|GlobalOffensiveTrade|           [deleted]|   true|     9|
|       SableArgyle| 1454326166|Something might h...|            OnePiece|So waaaaaaaay bac...|   true|   650|
|      nilimajaveri| 1454326166|Buy queen bed mat...| InternetIsBeautiful|                    |  false|     0|
|

--------


<font size=10 color=red>Ejercicio</font>

Obten los usuarios que han subido a reddit más de 10 posts de más de 100 carácteres.

--------
Respuesta:

In [63]:
# Obtenemos primero aquellas entradas en las que la longitud del reddit tiene más de 100 caracteres.
# Despues, agrupamos por autor y contamos aquellos que tienen mas de 10 entradas
resultado = df.where("length > 100").groupBy("author").count().where("count > 10")
resultado.toPandas()


Unnamed: 0,author,count
0,boardgamerecommender,12
1,dota2matchbot,230
2,[deleted],529
3,JmodTracker,11
4,lolretkj,12
5,gyfaglover4,242
6,AutoModerator,616
7,Jesupekka,21
8,autotldr,221
9,Chabombs,13


----