# RDD creation

#### [Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

Apache Spark trabaja con un conjunto de datos denominados RDD (Resilient Distributed Dataset o Conjunto de Datos Distribuidos Resistentes), estos poseen una serie de características que los hacen diferenciarse de otros tipos de estructuras de datos:
  + Inmutables: Una vez creados no se pueden modificar.
  + Distribuidos: Hace referencia al RDD, están divididos en particiones que están repartidas por el clúster
  + Resilientes: Esto quiere decir que en el caso de que se pierda una partición, esta se regenara automáticamente.

Los RDD a pesar de ser inmutables pueden ser transformados, de manera que se crean un nuevo RDD y estas transformaciones se aplican a los datos del nuevo RDD.

Existen distintas formas de generar RDDs:
  + A partir de un fichero
  + Distribución de datos desde el driver
  + Transformar un RDD para crear un nuevo RDD.

## Ciclo de vida de un RDD

![ciclo de vida de RDD](https://keepcoding.io/wp-content/uploads/2022/06/image-39-1024x473.png)

# SparkContext

SparkContext o Punto de acceso. 

Para realizar operaciones necesitamos un Context: 
  + SparkContext, SQLContext...

Dependerá del tipo de operación al principio estaba SparkContext y se usaba para operaciones con RDDs, despues salio SparkSession, para RDDs, Dataframes y Datsets. 

SparkSession contempla internamente el SparkContext, HiveContext, SQLContext...

SparkSession nos sirve para todos los contextos.

En principio usar SparkSession sería lo más correcto, ya que establecemos una sesión con el nodo maestro.

In [0]:

import pyspark

sc = spark.Context

sc

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-497020945842112>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;32mimport[0m [0mpyspark[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;34m[0m[0m
[0;32m----> 3[0;31m [0msc[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mContext[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      4[0m [0;34m[0m[0m
[1;32m      5[0m [0msc[0m[0;34m[0m[0;34m[0m[0m

[0;31mAttributeError[0m: 'SparkSession' object has no attribute 'Context'

In [0]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [0]:
sc = spark.sparkContext

In [0]:
sc

En SparkContext disponemos de una interfaz gráfica donde se encuentran todos los jobs ejecutados

# Obtención de los datos

Vamos a usar una parte (10%) del dataset utilizado en la KDD Cup de 1999, contiene aproximadamente medio millón de filas. La vamos a descargar de forma local en un archivo Gzip

### Crear un RDD desde un fichero

In [0]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)



### Crear un RDD usando paralelización

In [0]:
myRDD = sc.textFile("file://"+SparkFiles.get('kddcup.data_10_percent.gz'))


In [0]:
myRDD.take(5)

Out[11]: ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [0]:
type(myRDD)

Out[12]: pyspark.rdd.RDD

In [0]:
myRDD.count()

Out[13]: 494021

###Obtener datos y particiones

In [0]:
a = range(100)

data = sc.parallelize(a,8)
type(data)


Out[14]: pyspark.rdd.PipelinedRDD

In [0]:
rddCollet = data.collect()
print("Number of Partitions: "+str(data.getNumPartitions()))
print("Action: First element: "+str(data.first()))
print(rddCollet)

Number of Partitions: 8
Action: First element: 0
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]


# Transformaciones de datos

### Filter

Con esta tipo de transformación podemos generar un RDD con los elementos que cumplan una condición determinada

In [0]:
myRDD.count()


Out[19]: 494021

In [0]:
normal_myRDD = myRDD.filter(lambda x : 'normal' in x)

In [0]:
normal_myRDD.count()

Out[21]: 97278

In [0]:
from time import time
t0 = time()
normal_count = normal_myRDD.count()
tt = time() - t0
print("Hay {} interaciones etiquetadas como 'normal'".format(normal_count))
print("Count completedo en {} segundos".format(round(tt,3)))

Hay 97278 interaciones etiquetadas como 'normal'
Count completedo en 1.753 segundos


Como vemos el número total de filas ha cambiado de nuestro dataset original que tenía 494021 filas a 97278.

Como norma general los calculos se realizan cuando ejecutamos una acción, y no cuando realizamos transformaciones

### Map

Al igual que en Python la función Map nos permite aplicar una función a cada elemento de nuestro RDD.
Las expresiones lambda son muy útiles para este tipo de transformaciones.

In [0]:
csv_data = myRDD.map(lambda x: x.split(","))

t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print("Completedo en {} segundos".format(round(tt,3)))



Completedo en 0.865 segundos


In [0]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

key_csv_data = myRDD.map(parse_interaction)
head_rows = key_csv_data.take(5)
print(head_rows[0:2])

[('normal.', ['0', 'tcp', 'http', 'SF', '181', '5450', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '9', '9', '1.00', '0.00', '0.11', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.']), ('normal.', ['0', 'tcp', 'http', 'SF', '239', '486', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '19', '19', '1.00', '0.00', '0.05', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.'])]


### The Collect action

Collect es otro método de los báscicos que debemos conocer, básicamente lo que hace es coger todos los elementos de un RDD y los carga en nuestra memoria, para poder trabajar con ellos de forma local. 

Debemos ser precavidos cuando lo usemos, especialmente cuando estemos trabajando con RDDs con un gran número de filas.

In [0]:

t0 = time()
all_raw_data = myRDD.collect()
tt = time() - t0
print("Data collected in {} seconds".format(round(tt,3)))


Data collected in 4.305 seconds


In [0]:
all_raw_data[:3]


Out[26]: ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.']

# Conceptos Básicos de Spark DataFrame

Vamos a cargar unos datos desde la base de datos spark

In [0]:
df = spark.read.json('dbfs:/databricks-datasets/structured-streaming/events/file-0.json')


In [0]:
type(df)


Out[28]: pyspark.sql.dataframe.DataFrame

### Mostrar los datos

In [0]:
df.show()


+------+----------+
|action|      time|
+------+----------+
|  Open|1469501107|
|  Open|1469501147|
|  Open|1469501202|
|  Open|1469501219|
|  Open|1469501225|
|  Open|1469501234|
|  Open|1469501245|
|  Open|1469501246|
|  Open|1469501248|
|  Open|1469501256|
|  Open|1469501264|
|  Open|1469501266|
|  Open|1469501267|
|  Open|1469501269|
|  Open|1469501271|
|  Open|1469501282|
|  Open|1469501285|
|  Open|1469501291|
|  Open|1469501297|
|  Open|1469501303|
+------+----------+
only showing top 20 rows



In [0]:
display(df)

action,time
Open,1469501107
Open,1469501147
Open,1469501202
Open,1469501219
Open,1469501225
Open,1469501234
Open,1469501245
Open,1469501246
Open,1469501248
Open,1469501256


In [0]:
df.printSchema()

root
 |-- action: string (nullable = true)
 |-- time: long (nullable = true)



In [0]:
df.columns

Out[32]: ['action', 'time']

In [0]:
df.describe()

Out[34]: DataFrame[summary: string, action: string, time: string]

### Operaciones con columnas

#### Crear una columna

In [0]:
df['time']


Out[35]: Column<'time'>

In [0]:
type(df['time'])

Out[36]: pyspark.sql.column.Column

In [0]:
df.select('time').show()

+----------+
|      time|
+----------+
|1469501107|
|1469501147|
|1469501202|
|1469501219|
|1469501225|
|1469501234|
|1469501245|
|1469501246|
|1469501248|
|1469501256|
|1469501264|
|1469501266|
|1469501267|
|1469501269|
|1469501271|
|1469501282|
|1469501285|
|1469501291|
|1469501297|
|1469501303|
+----------+
only showing top 20 rows



In [0]:
display(df.select('time'))

time
1469501107
1469501147
1469501202
1469501219
1469501225
1469501234
1469501245
1469501246
1469501248
1469501256


In [0]:
df.head(2)

Out[39]: [Row(action='Open', time=1469501107), Row(action='Open', time=1469501147)]

#### Renombrar una columna

In [0]:
display(df.withColumnRenamed('action', 'superaction'))


superaction,time
Open,1469501107
Open,1469501147
Open,1469501202
Open,1469501219
Open,1469501225
Open,1469501234
Open,1469501245
Open,1469501246
Open,1469501248
Open,1469501256


#### Operaciones con columnas

In [0]:
display(df.withColumn('newtime', df['time']*2))

action,time,newtime
Open,1469501107,2939002214
Open,1469501147,2939002294
Open,1469501202,2939002404
Open,1469501219,2939002438
Open,1469501225,2939002450
Open,1469501234,2939002468
Open,1469501245,2939002490
Open,1469501246,2939002492
Open,1469501248,2939002496
Open,1469501256,2939002512


## SQL

Podemos usar queries de SQL directamente sobre el dataframe, pero para ellos deberemos de guardarlo en una vista temporal.

In [0]:
df.createOrReplaceTempView('IoT')

In [0]:
sql_results = spark.sql('SELECT * FROM IoT')

In [0]:
display(sql_results)

action,time
Open,1469501107
Open,1469501147
Open,1469501202
Open,1469501219
Open,1469501225
Open,1469501234
Open,1469501245
Open,1469501246
Open,1469501248
Open,1469501256


In [0]:
display(spark.sql("SELECT * FROM IoT WHERE time < 1469501219"))

action,time
Open,1469501107
Open,1469501147
Open,1469501202
