<a href="https://colab.research.google.com/github/IvanPenhaRamos/GlosarioSpark/blob/main/GlosarioSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
!pip install matplotlib

# Crear sesión de Spark

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType


spark = SparkSession.builder\
                    .appName('prueba_spark')\
                    .master("local[*]")\
                    .getOrCreate()
SparkContext = spark.sparkContext
sc = SparkContext

Para acceder a Scala desde Spark2  y pyspark2 desde la shell

In [None]:
%sh
spark2-shell
pyspark2

# Comandos básicos HDFS

In [None]:
%sh
hdfs dfs -mkdir
hdfs dfs -ls
hdfs dfs -rm
hdfs dfs -put/-get <Destino> <Origen>
hdfs dfs -cat

#RDDs

Crear RDDs desde archivos de texto mapeando cada línea como elemento de un array de tipo string

In [None]:
RDDdesdeTXT = spark.sparkContext.textFile("archivo.txt")

RDDdesdeData = sc.textFile("pathDeMiData/")

RDDdesdeLogs = spark.sparkContext.textFile("mydata/*.log")

RDDdesdeMásDeUnTXT = spark.sparkContext.textFile("archivo1.txt,archivo2.txt")

*sc.wholeTextFiles* mapea cada archivo de un directorio como elemento de un único RDD (sólo es útil con archivos pequeños porque lo debe soportar la memoria)



In [None]:
userRDD = sc.wholeTextFiles("NombreDirectorio/")

**RDDs desde colecciones**

Se usan para testing, generar datos de forma programada, integrar con otras librerías o sistemas o aprendizaje.



In [None]:
myData = ["Alice","Carlos","Frank","Barbara", "Alice"]

myRDD = sc.parallelize(myData)


**Salvando RDDs**

Para salvar el RDD como archivo de texto plano

Hay que poner el nombre de un directorio, no de un archivo ya que al ser un archivo distribuido tendrá diferentes nombres.

In [None]:
myRDD.saveAsTextFile("mydata/")

Acciones comunes en RDDs:

- .count Devuelve el número de elementos

- .first Devuelve el primer elemento

- .take (n) Devuelve un array (Scala) o una lista (Python) de los primeros n elementos

- .collect Devuelve un array (Scala) o una lista (Python) de todos los elementos

In [None]:
myRDD.count()

In [None]:
myRDD.first()

In [None]:
myRDD.take(2)

In [None]:
myRDD.collect()

Ejemplos de transformaciones:

- .distinct Crea un nuevo RDD eliminando los elementos duplicados

- .union(rdd) Crea un nuevo RDD uniendo la data de un RDD en otro

- .map(function) Crea un nuevo RDD ejectuando la función en cada elemento del RDD origen

- .filter(function) Crea un nuevo RDD incluyendo o excluyendo cada registro del RDD en base a una función booleana

- .flatMap

- .mapPartitions

In [None]:
myRDD.distinct().collect()

In [None]:
myRDD.union(RDDdesdeData)

In [None]:
myRDD.map(lambda x: x.upper())

In [None]:
myRDD.filter(lambda x: x.startswith("A"))

**Transformaciones entre RDDs**

- .union(RDD) Une **todos** los elementos de los dos RDDs

- .intersection(RDD) Devuelve los elementos que tienen en común ambos RDDs

- .substract(RDD) Elimina los elementos del RDD parametrizado al RDD original

In [None]:
r1 = [1,2,3,3]
r2 = [2,4]

rdd1 = sc.parallelize(r1)
rdd2 = sc.parallelize(r2)

In [None]:
rdd1.union(rdd2).collect()

In [None]:
rdd1.intersection(rdd2).collect()

In [None]:
rdd1.subtract(rdd2).collect()

# DataFrames
---

La SparkSession (necesaria para trabajar con DFs) tiene funciones y atributos para acceder a la funcionalidad de Spark




In [16]:
usersschema = StructType([
    StructField("nombre", StringType(), True),
    StructField("apellido", StringType(), True),
    StructField("nacionalidad", StringType(), True)
])

In [29]:
users =[
    {   "nombre": "Juana",
        "apellido": "de Arco",
        "nacionalidad": "Francesa"  },
    {   "nombre": "Leonardo",
        "apellido": "da Vinci",
        "nacionalidad": "Italiano"  },
    {   "nombre": "Cleopatra",
        "apellido": "VII",
        "nacionalidad": "Egipcia"   },
    {   "nombre": "Albert",
        "apellido": "Einstein",
        "nacionalidad": "Aleman"    },
    {   "nombre": "Marie",
        "apellido": "Curie",
        "nacionalidad": "Polaca"    },
]

In [30]:
usersDF = spark.createDataFrame(users)

- spark.read Para obtener datos desde distintas fuentes


In [None]:
usersJSONDF = spark.read.json("users.json", schema = usersschema)

- spark.sql (query) Para ejecutar una query habiéndo preparado la view previamente mediante [.createOrReplaceTempView](https://colab.research.google.com/drive/1LwU9sqamD9hfymxMyABUm-jdsOaruPwt#scrollTo=sQrZdPRayaZC)

In [31]:
usersDF.createOrReplaceTempView("users")
spark.sql("SELECT apellido FROM users").show()

+--------+
|apellido|
+--------+
| de Arco|
|da Vinci|
|     VII|
|Einstein|
|   Curie|
+--------+



- spark.catalog Punto de entrada para el [Catalog API](https://spark.apache.org/docs/3.5.2/api/scala/org/apache/spark/sql/catalog/Catalog.html) para gestionar tablas

- spark.conf para configurar spark

- spark.sparkContext [Punto de entrada a la API de Spark](https://colab.research.google.com/drive/1LwU9sqamD9hfymxMyABUm-jdsOaruPwt#scrollTo=hfpwxG3rmh_G&line=5&uniqifier=1)

In [None]:
spark.catalog.createTable("tableUsers",usersDF)

## Operaciones con DFs

Acciones más comunes:

- .count Devuelve el número de filas
- .first Devuelve la primera fila
- .take (n) Devuelve las primera n filas en forma de array
- .show (n) Muestra las primeras n filas en forma de tabla (por defecto muestra 20)
- .collect Devuelve todas las filas de un DF como un array
- .write Escribe la data en un archivo u otra fuente de data


In [None]:
usersDF.count()

In [None]:
usersDF.first()

In [None]:
usersDF.take(2)

In [None]:
usersDF.show(3)

In [None]:
usersDF.collect()

In [38]:
usersDF.write.csv("users.csv")

Las *transformaciones* definen un nuevo DF basado en el actual ya que los DFs son inmutables, al igual que las RDD

- *.select*: only the specified columns are included

- *.where*: only rows where the specified expression is true are included (synonym for filter)

- *.orderBy*: rows are sorted by the specified column(s) (synonym for sort)

- *.join*: joins two DataFrames on the specified column(s)

- *.limit (n)*: creates a new DataFrame with only the first n rows

- *.collect*: returns all the rows in the DataFrame as an array

- *.write*: save the data to a file or other data source

In [39]:
usersnamesDF = usersDF.select("nombre","apellido").orderBy("apellido")
usersnamesDF.show()

+---------+--------+
|   nombre|apellido|
+---------+--------+
|    Marie|   Curie|
|   Albert|Einstein|
|Cleopatra|     VII|
| Leonardo|da Vinci|
|    Juana| de Arco|
+---------+--------+



# Infiriendo un *schema* a un topic en formato JSON



In [None]:
%spark

val kafkaDF = spark.readStream.format("kafka"). \
                              option("kafka.bootstrap.servers", "localhost:9092"). \
                              option("subscribe", "activations"). \
                              load()

El DataStream `kafkaDF` lee del topic `activations` una data en formato JSON

Aquí le definimos un `schema` y se lo aplicamos

In [None]:
%scala

import org.apache.spark.sql.types._

val activationsSchema = StructType( List( StructField("acct_num", IntegerType),
                                         StructField("dev_id", StringType),
                                          StructField("phone", StringType),
                                          StructField("model", StringType)))

val activationsDF = kafkaDF. select(from_json($"value".cast("string"), activationsSchema).alias("activation"))

# Separar un DataFrame leído en CSV en un DataFrame separado en columnas
---

In [None]:
from pyspark.sql.functions import *

statusDF =  linesDF. \
            withColumn("model", split(linesDF.value, ",")[0]). \
            withColumn("dev_id", split(linesDF.value, ",")[1]). \
            withColumn("dev_temp",split(linesDF.value, ",")[2].cast("integer")). \
            withColumn("signal",split(linesDF.value, ",")[3].cast("integer"))

# .createOrReplaceTempView("nombreDeLaView")

Crea una view de un RDD o un DF para poder usar lenguaje de *spark.SQL*  

**COMPLETAR**

# Proccesing Time

In [None]:
%scala

import org.apache.spark.sql.streaming.Trigger.ProcessingTime

val sortedModelCountQuery = sortedModelCountDF. writeStream. \
                            outputMode("complete").\
                            format("console"). \
                            option("truncate","false"). \
                            trigger(ProcessingTime("5 seconds")).\
                            start

# Spark SQL Data Types
---

# JOINS - Combining and Splitting DataFrames
---

Joins are expensive in the big-data world
- Perform joins early in the process
- Amortize the cost over many use cases


**Cross join**

Use the `crossJoin` DataFrame method to join every row in the left (`scientists`) DataFrame with every row in the right (`offices`) DataFrame:

**Crea un producto cartesiano**

In [None]:
scientists.crossJoin(offices).show()

# Columns with the same name are not renamed. This is called the Cartesian product of the two DataFrames.

**Inner join**

Use a join expression and the value `inner` to return only those rows for which the join expression is true:

In [None]:
scientists.join(offices, scientists("office_id") === offices("office_id"), "inner").show()

# Since the join key has the same name on both DataFrames, we can simplify the join as follows:

scientists.join(offices, "office_id").show()

**Left semi join**

Use the value `left_semi` to return the rows in the left DataFrame that match rows in the right DataFrame:


In [None]:
scientists.join(offices, scientists("office_id") === offices("office_id"), "left_semi").show()

**Left anti join**

Use the value `left_anti` to return the rows in the left DataFrame that do not match rows in the right DataFrame:


In [None]:
scientists.join(offices, scientists("office_id") === offices("office_id"), "left_anti").show()

**Left outer join**

Use the value `left` or `left_outer` to return every row in the left DataFrame with or without matching rows in the right DataFrame:

In [None]:
scientists.join(offices, scientists("office_id") === offices("office_id"), "left_outer").show

**Right outer join**

Use the value `right` or `right_outer` to return every row in the right

DataFrame with or without matching rows in the left DataFrame:

In [None]:
scientists.join(offices, scientists("office_id") === offices("office_id"), "right_outer").show

**Full outer join**

Use the value `full`, `outer`, or `full_outer` to return the union of the left outer and right outer joins (with duplicates removed):


In [None]:
scientists.join(offices, scientists("office_id") === offices("office_id"), "full_outer").show

## Joining Streaming DataFrames

Types supported:


|Left DataFrame | Right DataFrame | Supported Joins |
|-              |                 |                 |
|streaming      |static           |inner            |
|               |                 |left outer       |
|static         |streaming        |inner            |
|               |                 |right outer      |
|streaming      |streaming        |inner            |
|               |                 |left outer       |
|               |                 |right outer      |


In [None]:
statusWithAccountDF = statusStreamDF. \
      join(accountDevStaticDF,accountDevStaticDF.account_device_id ==statusStreamDF.device_id)

In [None]:
#En este caso las columnas se llaman igual en los dos DFs

joinedDF = statusStreamDF.join(activationsStreamDF,"dev_id")

## Spark SQL supports the following set operations:

- Union
- Intersection
- Subtraction
Spark SQL provides a method to split a DataFrame into random subsets


**Union**

In [None]:
driver_names = drivers.select("first_name")

rider_names = riders.select("first_name")


names_union = driver_names.union(rider_names).orderBy("first_name")

Note that union **does not remove duplicates**. Use the distinct method to remove duplicates

In [None]:
names_distinct = names_union.distinct()

**Intersect**

Use the intersect method to return rows that exist in both DataFrames

In [None]:
name_intersect = driver_names.intersect(rider_names).orderBy("first_name")


**Except**
Use the except method to return rows in the left

DataFrame that do not exist in the right DataFrame

In [None]:
names_subtract = driver_names.except(rider_names).orderBy("first_name")

Spark SQL data types are defined in the *pyspark.sql.types* module

Spark SQL supports the following basic data types:
- NullType
- StringType
- Byte array data type
  - BinaryType
- BooleanType
- Integer data types
  - ByteType
  - ShortType
  - IntegerType
  - LongType
- Fixed-point data type
  - DecimalType
- Floating-point data types
  - FloatType
  - DoubleType
- Date and time data types
  - DateType
  - TimestampType

Spark also supports the following complex (collection) types:
- ArrayType
- MapType
- StructType

Spark SQL provides various methods and functions that can be applied to the various data types


# Mazo types

[Spark Python API - pyspark.sql.functions.array](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.ArrayType.html#pyspark.sql.types.ArrayType)

[Spark Python API - pyspark.sql.functions.create_map](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.MapType.html#pyspark.sql.types.MapType)

[Spark Python API - pyspark.sql.functions.struct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html#pyspark.sql.types.StructType)

# Window functions

Spark SQL supports the following window functions:
- cume_dist
- dense_rank
- lag
- lead
- ntile
- percent_rank
- rank
- row_number


# Sliding window aggregation
---

In [None]:
window.timeColumn # reference to column containing time event occurred

window.windowDuration("1 minute") # window time length and how often windows are created (Specified as strings such as "1 minute" or "30 seconds")

window.slideDuration

**EJEMPLO**

Count status messages for selected models for every 10 minutes period
- Update count every five minutes

Use the timestamp column in status Kafka messages to group by window

In [None]:
kafkaDF = spark.readStream.format("kafka")...

from pyspark.sql.functions import *

modelsTimeDF = kafkaDF.select("timestamp", \
                split(kafkaDF.value, ",")[0].alias("model"))

windowRoninCountsDF = \
      modelsTimeDF.where(modelsTimeDF.model.startswith("Ronin S")). \
      groupBy(window("timestamp","10 minutes","5 minutes"),"model").count()  # Línea a destacar

windowRoninCountsQuery = windowRoninCountsDF.writeStream. \
      outputMode("complete").format("console").start()

**EJEMPLO DE WATERMARK EN DATASTREAM**

In [None]:
modelsTimeDF = ...

watermarkRoninCountsDF = modelsTimeDF. \
                where(modelsTimeDF.model.startswith("Ronin S")). \
                withWatermark("timestamp", "1 minute"). \   ########
                groupBy(window("timestamp","10 seconds","5 seconds"),"model").count()

watermarkRoninCountsQuery =
                watermarkRoninCountsDF.writeStream. \
                outputMode("complete").format("console"). \
                option("truncate","false").start()

# DataSet
---

Ejemplo de cómo crear un DataSet sencillo con el método sequence `Seq`

In [None]:
%scala

val strings = Seq("a string","another string")

val stringDS = spark.createDataset(strings)

stringDS.show

+--------------+
|         value|
+--------------+
|      a string|
|another string|
+--------------+

Scala case classes are a useful way to represent data in a Dataset
- They are often used to create simple data-holding objects in Scala
- Instances of case classes are called products

Encoders define a Dataset's schema using reflection on the object type
- Case class arguments are treated as columns

In [None]:
%scala

case class Name(firstName: String, lastName: String)

val names = Seq(Name("Fred","Flintstone"),Name("Barney","Rubble"))


# required if not running in shell to uses classes as parameter in datasets
import spark.implicits._

val nameDS = spark.createDataset(names)

nameDS.show()

+---------+----------+
|firstName|  lastName|
+---------+----------+
|     Fred|Flintstone|
|   Barney|    Rubble|
+---------+----------+

**Crear un DataSet desde un DataFrame**

```
% JSON
{"firstName":"Grace","lastName":"Hopper"}
{"firstName":"Alan","lastName":"Turing"}
{"firstName":"Ada","lastName":"Lovelace"}
{"firstName":"Charles","lastName":"Babbage"}
```

In [None]:
%scala

val namesDF = spark.read.json("names.json")

case class Name(firstName: String, lastName: String)

val namesDS = namesDF.as[Name]