In [2]:
import os 
os.environ['SPARK_HOME']=r'C:/spark/'
os.environ['HADOOP_HOME'] = r'C:/hadoop/'
os.environ['PYSPARK_DRIVER_PYTHON']='jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']='lab'
os.environ['PYSPARK_PYTHON']='python'

**Spark datasets**

1. RDD
- Low Level API
- Funcionales

2. DataFrames
- High Level API
- Relacional
- Optimizados para querys

**Mapper Transformations**

1. map(f) >> Relacion 1 a 1 >> aplica la funcion f() a un RDD
2. mapValues(f) >> Relacion 1 a 1 >> pasa cada pareja (key,value) del RDD a f()
3. flatMap(f) >> Relacion 1 a muchos >> Aplica la funcion f() a los elementos del RDD y aplana los resultados
4. flatMapValues(f) >> Relacion 1 a muchos >> Pasa cada valor (key,value) del RDD por el flatMap(f) sin cambiar keys
5. mapPartitions(f) >> Relacion muchos a 1 >> Devuelve un RDD aplicando la funcion f() a cada particion del RDD fuente 

# 2. mapValues()


- Es especifica para key-value pairs en RDDs.
- Aplica la funcion a cada uno de los valores menteniendo los keys sin cambio.
- Produce un nuevo key-value RDD con los datos transformados


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("App1").getOrCreate()
# Data
data = [
    ("A", []), ("Z", [40]),
    ("C", [10, 20, 30]), ("D", [60, 70])
]
# Creamos el RDD
rdd = spark.sparkContext.parallelize(data) # RDD[integer]
rdd.collect()

[('A', []), ('Z', [40]), ('C', [10, 20, 30]), ('D', [60, 70])]

In [4]:
def f(x):
    if len(x) == 0: return 0
    else: return len(x)+1
rdd2 = rdd.mapValues(f)
rdd2.collect()

[('A', 0), ('Z', 2), ('C', 4), ('D', 3)]

In [5]:
spark

In [6]:
# Otro ejemplo (Funciones Lambda)
rdd = spark.sparkContext.parallelize([(1, 'apple'), (2, 'banana'), (3, 'cherry')])
result = rdd.mapValues(lambda x: len(x))
result.collect()

[(1, 5), (2, 6), (3, 6)]

# 3. flatMap
- Es una transformacion one-to-many
- Toma un elemento del RDD y lo transforma en muchos (0,1,2,3, etc)
- No funciona con el Spark Dataframe, pero si tiene la funcion explode

In [7]:
data= [
    ('David',['Java','Scala','Python']),
    ('Juan',['Cobol','C']),
    ('bob',['C++']),
    ('ted',[]),
    ('andres',[])
]
# Crear dataframe
df = spark.createDataFrame(
    data= data, schema= ['nombre','Lenguajes']
)
df.show(truncate=False)

+------+---------------------+
|nombre|Lenguajes            |
+------+---------------------+
|David |[Java, Scala, Python]|
|Juan  |[Cobol, C]           |
|bob   |[C++]                |
|ted   |[]                   |
|andres|[]                   |
+------+---------------------+



In [8]:
from pyspark.sql.functions import explode
# explode
exploded=df.select(df.nombre,
          explode(df.Lenguajes).alias('Lang'))
exploded.show(truncate=False)

+------+------+
|nombre|Lang  |
+------+------+
|David |Java  |
|David |Scala |
|David |Python|
|Juan  |Cobol |
|Juan  |C     |
|bob   |C++   |
+------+------+



In [9]:
# Si quiero para varias columnas
some_data = [
    ('david', ['Java','Scala', 'Python'], ['MS', 'PHD']),
    ('andrea', ['Cobol','Snobol'], ['BS', 'MS']),
    ('pedro', ['C++'], ['BS', 'MS', 'PHD']),
    ('juan', [], ['BS', 'MS']),
    ('andres', ['FORTRAN'], []),
    ('sofia', [], [])
]

df= spark.createDataFrame(data=some_data,
                          schema=['nombre','lenguajes','educacion'])
df.show(truncate=True)

+------+--------------------+-------------+
|nombre|           lenguajes|    educacion|
+------+--------------------+-------------+
| david|[Java, Scala, Pyt...|    [MS, PHD]|
|andrea|     [Cobol, Snobol]|     [BS, MS]|
| pedro|               [C++]|[BS, MS, PHD]|
|  juan|                  []|     [BS, MS]|
|andres|           [FORTRAN]|           []|
| sofia|                  []|           []|
+------+--------------------+-------------+



In [10]:
exploded_1= df.select(
    df.nombre,
    explode(df.lenguajes).alias('leng'),
    df.educacion
)
exploded_1.show(truncate=False)

+------+-------+-------------+
|nombre|leng   |educacion    |
+------+-------+-------------+
|david |Java   |[MS, PHD]    |
|david |Scala  |[MS, PHD]    |
|david |Python |[MS, PHD]    |
|andrea|Cobol  |[BS, MS]     |
|andrea|Snobol |[BS, MS]     |
|pedro |C++    |[BS, MS, PHD]|
|andres|FORTRAN|[]           |
+------+-------+-------------+



In [11]:
exploded_2= exploded_1.select(
    exploded_1.nombre,
    exploded_1.leng,
    explode(exploded_1.educacion).alias('grado')
)
exploded_2.show(truncate=False)

+------+------+-----+
|nombre|leng  |grado|
+------+------+-----+
|david |Java  |MS   |
|david |Java  |PHD  |
|david |Scala |MS   |
|david |Scala |PHD  |
|david |Python|MS   |
|david |Python|PHD  |
|andrea|Cobol |BS   |
|andrea|Cobol |MS   |
|andrea|Snobol|BS   |
|andrea|Snobol|MS   |
|pedro |C++   |BS   |
|pedro |C++   |MS   |
|pedro |C++   |PHD  |
+------+------+-----+



# Ejemplo sobre datos reales

In [12]:
from pyspark.sql import SparkSession
# Crear una instancia de  SparkSession
spark = SparkSession.builder.getOrCreate()
parse= spark.read.option("header", "true").option("nullValue","?").\
    option("inferSchema","true").csv("./prueba/block_1/block_1.csv")
parse

DataFrame[id_1: int, id_2: int, cmp_fname_c1: double, cmp_fname_c2: double, cmp_lname_c1: double, cmp_lname_c2: double, cmp_sex: int, cmp_bd: int, cmp_bm: int, cmp_by: int, cmp_plz: int, is_match: boolean]

In [13]:
parse.show(3)

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|        null|         1.0|        null|      1|     1|     1|     1|      0|    true|
|39086|47614|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|70031|70237|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
only showing top 3 rows



In [14]:
# Convertir a rdd
rdd = parse.rdd
flat_mapped_rdd = rdd.flatMap(lambda row: [(i, row[i]) for i in range(len(row))])

In [15]:
flat_mapped_rdd.collect()[0:2]

[(0, 37291), (1, 53113)]

In [None]:
flat_mapped_rdd.collect()

In [17]:
import math
def sqrt_value(value):
    if value is not None:
        return math.sqrt(value)+1.3
    else:
        return None

mapped_rdd = flat_mapped_rdd.mapValues(sqrt_value)

mapped_rdd.collect()[0:2]

[(0, 194.40877763581852), (1, 231.76257830719504)]

In [23]:
parse.columns

list