# RDD creation

#### [Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

Apache Spark trabaja con un conjunto de datos denominados RDD (Resilient Distributed Dataset o Conjunto de Datos Distribuidos Resistentes), estos poseen una serie de características que los hacen diferenciarse de otros tipos de estructuras de datos:
  + Inmutables: Una vez creados no se pueden modificar.
  + Distribuidos: Hace referencia al RDD, están divididos en particiones que están repartidas por el clúster
  + Resilientes: Esto quiere decir que en el caso de que se pierda una partición, esta se regenara automáticamente.

Los RDD a pesar de ser inmutables pueden ser transformados, de manera que se crean un nuevo RDD y estas transformaciones se aplican a los datos del nuevo RDD.

Existen distintas formas de generar RDDs:
  + A partir de un fichero
  + Distribución de datos desde el driver
  + Transformar un RDD para crear un nuevo RDD.

## Ciclo de vida de un RDD

![ciclo de vida de RDD](https://keepcoding.io/wp-content/uploads/2022/06/image-39-1024x473.png)

# SparkContext

SparkContext o Punto de acceso. 

Para realizar operaciones necesitamos un Context: 
  + SparkContext, SQLContext...

Dependerá del tipo de operación al principio estaba SparkContext y se usaba para operaciones con RDDs, despues salio SparkSession, para RDDs, Dataframes y Datsets. 

SparkSession contempla internamente el SparkContext, HiveContext, SQLContext...

SparkSession nos sirve para todos los contextos.

En principio usar SparkSession sería lo más correcto, ya que establecemos una sesión con el nodo maestro.

In [0]:
import pyspark

sc=spark.sparkContext


En SparkContext disponemos de una interfaz gráfica donde se encuentran todos los jobs ejecutados

In [0]:
sc


# Obtención de los datos

Vamos a usar una parte (10%) del dataset utilizado en la KDD Cup de 1999, contiene aproximadamente medio millón de filas. La vamos a descargar de forma local en un archivo Gzip

### Crear un RDD desde un fichero

In [0]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)


In [0]:
myRDD = sc.textFile("file://"+SparkFiles.get("kddcup.data_10_percent.gz"))

In [0]:
type(myRDD)

In [0]:
myRDD.count()

In [0]:
myRDD.take(5)

### Crear un RDD usando paralelización

In [0]:
a = range(100)

data = sc.parallelize(a)
type(data)

'''
Hace p particiones, tb se lo podemos especificar
data = sc.parallelize(a, p)
'''


In [0]:
data.count()

###Obtener datos y particiones

Usaremos el método collect para obtener datos de un dataframe previamente filtrado ya, de esta forma evitamos tratar de cargar datos de forma masiva, lo que podría llevar a errores por falta de memoria

In [0]:

rddCollect = data.collect()

print("Number of Partitions: "+str(data.getNumPartitions()))
print("Action: First element: "+str(data.first()))
print(rddCollect)

# Transformaciones de datos

### Filter

Con esta tipo de transformación podemos generar un RDD con los elementos que cumplan una condición determinada

In [0]:
myRDD.count()

In [0]:
'''
Vamos a filtrar nuestro RDD para contar cuantas conexiones normales hay en el dataset
'''
normal_myRDD = myRDD.filter(lambda x: 'normal.' in x)

In [0]:
normal_myRDD.count()

Como vemos el número total de filas ha cambiado de nuestro dataset original que tenía 494021 filas a 97278.

Como norma general los calculos se realizan cuando ejecutamos una acción, y no cuando realizamos transformaciones

In [0]:
from time import time
t0 = time()
normal_count = normal_myRDD.count()
tt = time() - t0
print("Hay {} interaciones etiquetadas como 'normal'".format(normal_count))
print("Count completedo en {} segundos".format(round(tt,3)))

### Map

Al igual que en Python la función Map nos permite aplicar una función a cada elemento de nuestro RDD.
Las expresiones lambda son muy útiles para este tipo de transformaciones.

In [0]:
csv_data = myRDD.map(lambda x: x.split(","))

t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print("Completedo en {} segundos".format(round(tt,3)))


In [0]:
t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0
print("Completedo en {} segundos".format(round(tt,3)))

In [0]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return {tag: elems[:-1]}

key_csv_data = myRDD.map(parse_interaction)
head_rows = key_csv_data.take(5)
print(head_rows[0:2])


In [0]:
display(head_rows)

### The Collect action

Collect es otro método de los báscicos que debemos conocer, básicamente lo que hace es coger todos los elementos de un RDD y los carga en nuestra memoria, para poder trabajar con ellos de forma local. 

Debemos ser precavidos cuando lo usemos, especialmente cuando estemos trabajando con RDDs con un gran número de filas.

In [0]:
t0 = time()
all_raw_data = myRDD.collect()
tt = time() - t0
print("Data collected in {} seconds".format(round(tt,3)))


In [0]:
all_raw_data[:3]


# Conceptos Básicos de Spark DataFrame

Vamos a cargar unos datos desde la base de datos spark

In [0]:
df = spark.read.json('dbfs:/databricks-datasets/structured-streaming/events/file-0.json')

In [0]:
type(df)

### Mostrar los datos

In [0]:
df.show()

In [0]:
df.printSchema()

In [0]:
df.columns

In [0]:
df.describe()

### Operaciones con columnas

#### Crear una columna

In [0]:

df['time']

In [0]:
type(df['time'])

In [0]:
'''
Para mostrar la columna como un dataframe
'''
df.select('time').show()

In [0]:
display(df.select('time'))

In [0]:
df.head(2)

In [0]:
display(df.select(df.columns))

In [0]:
display(df.select('time','action'))

#### Renombrar una columna

In [0]:
display(df.withColumnRenamed('action','superaction'))


#### Operaciones con columnas

In [0]:
display(df.withColumn('newtime', df['time']+5))


In [0]:
display(df.withColumn('half_time', df['time']/2))

## SQL

Podemos usar queries de SQL directamente sobre el dataframe, pero para ellos deberemos de guardarlo en una vista temporal.

In [0]:
df.createOrReplaceTempView("IoT")

In [0]:
sql_results = spark.sql("SELECT * FROM IoT")

In [0]:
display(sql_results)

In [0]:
display(spark.sql("SELECT * From IoT WHERE time < 1469501149"))

## Documentación

### [Ejemplos y Documentación Spark](https://sparkbyexamples.com/) 

### [Ejemplos y Documentación PySpark](https://sparkbyexamples.com/pyspark-tutorial/)