In [1]:
# pip install pyspark

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

In [4]:

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
sc = spark.sparkContext


**¿Qué es un RDD?**

Resilient Distributed Dataset (RDD)

Se puede ver como una lista o un vector de objetos. Es el elemento básico de Spark.


In [6]:
# crear rdd vacio

rdd_vacio = sc.emptyRDD

In [7]:
# crear rdd vacio con 3 particiones

rdd_vacio_3 = sc.parallelize([], 3)

In [8]:
rdd_vacio_3.getNumPartitions()

3

In [9]:
# crear rdd no vacio con 3 particiones
rdd_no_vacio_3 = sc.parallelize([1,2,3,4,5], 3)

In [10]:
# ver los datos almacenados en el rdd
rdd_no_vacio_3.collect()

[1, 2, 3, 4, 5]

In [11]:
# Crear un RDD desde un archivo de texto
rdd_texto = sc.textFile('./ejemplo_texto.txt')

In [12]:
rdd_texto.collect()

['Este es un ejemplo de archivo de texto :)',
 'Esto es una frase de prueba 1.',
 'Esto es una frase de prueba 2.',
 'Esto es una frase de prueba 3.',
 'Esto es una frase de prueba 4.',
 'Esto es una frase de prueba 5.']

In [13]:
rdd_texto_completo = sc.wholeTextFiles('./ejemplo_texto.txt')

In [14]:
rdd_texto_completo.collect()

[('file:/c:/Users/Usuario/Documents/fabio/Fabio/Estadistica4all.github.io-1/Notebooks/Spark/ejemplo_texto.txt',
  'Este es un ejemplo de archivo de texto :)\r\nEsto es una frase de prueba 1.\r\nEsto es una frase de prueba 2.\r\nEsto es una frase de prueba 3.\r\nEsto es una frase de prueba 4.\r\nEsto es una frase de prueba 5.')]

Crear rdd a partir de un data frame:

In [None]:
import pandas as pd

data_pandas = pd.read_csv('House_Price_Regression.csv')

In [None]:
data_pandas.head()

In [15]:
# Crear un RDD a partir de un data-frame:

data_spark = spark.read.format("csv").option("header", "true").load("House_Price_Regression.csv")

rdd = sc.parallelize([], 3)

rdd_data_spark = data_spark.rdd

In [16]:
rdd_data_spark.collect()[0:5]

[Row(neighborhood_recode='46.0', latitude='25.113208', longitude='55.138932', price='2700000', no_of_bedrooms='1', no_of_bathrooms='2', quality_recode='2.0', maid_room_recode='0.0', unfurnished_recode='0.0', balcony_recode='1.0', barbecue_area_recode='1.0', central_ac_recode='1.0', childrens_play_area_recode='1.0', childrens_pool_recode='0.0', concierge_recode='1.0', covered_parking_recode='0.0', kitchen_appliances_recode='1.0', maid_service_recode='0.0', pets_allowed_recode='1.0', private_garden_recode='0.0', private_gym_recode='0.0', private_jacuzzi_recode='0.0', private_pool_recode='0.0', security_recode='0.0', shared_gym_recode='1.0', shared_pool_recode='0.0', shared_spa_recode='0.0', view_of_water_recode='1.0', size_in_m_2='100.242337'),
 Row(neighborhood_recode='46.0', latitude='25.106809', longitude='55.151201', price='2850000', no_of_bedrooms='2', no_of_bathrooms='2', quality_recode='2.0', maid_room_recode='0.0', unfurnished_recode='0.0', balcony_recode='1.0', barbecue_area_rec

## **Transformaciones sobre RDD**

Creamos rdd a partir de la columna `price` de `data_pandas`

In [20]:
rdd_price = sc.parallelize(data_pandas['price'])
rdd_price.collect()[0:7]

[2700000, 2850000, 1150000, 2850000, 1729200, 3119900, 8503600]

### **Map**

In [21]:
rdd_price_trans_1 = rdd_price.map(lambda x : x-2)
rdd_price_trans_1.collect()[0:7]

[2699998, 2849998, 1149998, 2849998, 1729198, 3119898, 8503598]

In [23]:
rdd_price_trans_2 = rdd_price.map(lambda x : x > data_pandas['price'].mean())
rdd_price_trans_2.collect()[0:7]

[True, True, False, True, False, True, True]

In [26]:
rdd_texto_trans_3 = rdd_texto.map(lambda x : x.upper())
rdd_texto_trans_3.collect()

['ESTE ES UN EJEMPLO DE ARCHIVO DE TEXTO :)',
 'ESTO ES UNA FRASE DE PRUEBA 1.',
 'ESTO ES UNA FRASE DE PRUEBA 2.',
 'ESTO ES UNA FRASE DE PRUEBA 3.',
 'ESTO ES UNA FRASE DE PRUEBA 4.',
 'ESTO ES UNA FRASE DE PRUEBA 5.']

In [28]:
rdd_texto_trans_4 = rdd_texto.map(lambda x : 'Hola. ' + x)
rdd_texto_trans_4.collect()

['Hola. Este es un ejemplo de archivo de texto :)',
 'Hola. Esto es una frase de prueba 1.',
 'Hola. Esto es una frase de prueba 2.',
 'Hola. Esto es una frase de prueba 3.',
 'Hola. Esto es una frase de prueba 4.',
 'Hola. Esto es una frase de prueba 5.']

### **FlatMap**

In [29]:
rdd_price_trans_5 = rdd_price.map(lambda x: (x, x ** 2))
rdd_price_trans_5.collect()[0:7]

[(2700000, 7290000000000),
 (2850000, 8122500000000),
 (1150000, 1322500000000),
 (2850000, 8122500000000),
 (1729200, 2990132640000),
 (3119900, 9733776010000),
 (8503600, 72311212960000)]

In [31]:
rdd_price_trans_5_flat = rdd_price.flatMap(lambda x: (x, x ** 2))
rdd_price_trans_5_flat.collect()[0:7]

[2700000,
 7290000000000,
 2850000,
 8122500000000,
 1150000,
 1322500000000,
 2850000]

In [34]:
rdd_texto_trans_6 = rdd_texto.map(lambda x: (x, x.upper()))
rdd_texto_trans_6.collect()

[('Este es un ejemplo de archivo de texto :)',
  'ESTE ES UN EJEMPLO DE ARCHIVO DE TEXTO :)'),
 ('Esto es una frase de prueba 1.', 'ESTO ES UNA FRASE DE PRUEBA 1.'),
 ('Esto es una frase de prueba 2.', 'ESTO ES UNA FRASE DE PRUEBA 2.'),
 ('Esto es una frase de prueba 3.', 'ESTO ES UNA FRASE DE PRUEBA 3.'),
 ('Esto es una frase de prueba 4.', 'ESTO ES UNA FRASE DE PRUEBA 4.'),
 ('Esto es una frase de prueba 5.', 'ESTO ES UNA FRASE DE PRUEBA 5.')]

In [35]:
rdd_texto_trans_6_flat = rdd_texto.flatMap(lambda x: (x, x.upper()))
rdd_texto_trans_6_flat.collect()

['Este es un ejemplo de archivo de texto :)',
 'ESTE ES UN EJEMPLO DE ARCHIVO DE TEXTO :)',
 'Esto es una frase de prueba 1.',
 'ESTO ES UNA FRASE DE PRUEBA 1.',
 'Esto es una frase de prueba 2.',
 'ESTO ES UNA FRASE DE PRUEBA 2.',
 'Esto es una frase de prueba 3.',
 'ESTO ES UNA FRASE DE PRUEBA 3.',
 'Esto es una frase de prueba 4.',
 'ESTO ES UNA FRASE DE PRUEBA 4.',
 'Esto es una frase de prueba 5.',
 'ESTO ES UNA FRASE DE PRUEBA 5.']

## **Filter**

## Cerrar sesion y asi liberar memoria:

In [22]:
# spark.stop()