In [1]:
!pip install pyspark



### Creación de una sesión de Spark SQL

Nota: documentación oficial de SparkSQL: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html

In [2]:
# Solo se necesitan si se corre con spark-submit (o si se corre en Google Colab)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ejemplo').getOrCreate()

In [3]:
type(spark)

pyspark.sql.session.SparkSession

### Creación de un Data Frame

In [4]:
# Si se corre Spark en modo local, por defecto lee del disco local (file:///)
df = spark.read.json('/content/personas.json')
type(df)

pyspark.sql.dataframe.DataFrame

In [5]:
# Mostrar contenido del dataframe
df.show()

+----+--------+------+
|edad|estatura|nombre|
+----+--------+------+
|null|    null|  Juan|
|  30|     175|Carlos|
|  25|    null| Pedro|
|null|     162|Teresa|
+----+--------+------+



In [6]:
# Mostrar esquema
df.printSchema()

root
 |-- edad: long (nullable = true)
 |-- estatura: long (nullable = true)
 |-- nombre: string (nullable = true)



In [7]:
# Resumen de los datos
df.describe().show()

+-------+------------------+-----------------+------+
|summary|              edad|         estatura|nombre|
+-------+------------------+-----------------+------+
|  count|                 2|                2|     4|
|   mean|              27.5|            168.5|  null|
| stddev|3.5355339059327378|9.192388155425117|  null|
|    min|                25|              162|Carlos|
|    max|                30|              175|Teresa|
+-------+------------------+-----------------+------+



### Acceso a los datos

In [8]:
# Nombres de las columnas
df.columns

['edad', 'estatura', 'nombre']

In [9]:
# Tipo de dato para las columnas
type(df.columns)

list

In [10]:
# Acceder a los datos de una columna o de varias columnas
df.select('edad').show()
df.select(['nombre', 'edad']).show()

+----+
|edad|
+----+
|null|
|  30|
|  25|
|null|
+----+

+------+----+
|nombre|edad|
+------+----+
|  Juan|null|
|Carlos|  30|
| Pedro|  25|
|Teresa|null|
+------+----+



In [11]:
# Extraccion de filas
df.head(2)[1]

Row(edad=30, estatura=175, nombre='Carlos')

In [12]:
# Recuperación de valores
dato = df.head(2)[1]
dato.edad          # Equivalente a dato[0]

30

### Cambio del esquema

In [13]:
# Asignación de un nuevo esquema a los datos (cambiar edad a string y estatura a int)
from pyspark.sql.types import (StructField, StructType, 
                               StringType, IntegerType)

# Creación del nuevo esquema
d_schema = [StructField('edad', StringType(), True),
            StructField('estatura', IntegerType(), True),
            StructField('nombre', StringType(), True),
           ]
schema_nuevo = StructType(fields = d_schema)
schema_nuevo

StructType(List(StructField(edad,StringType,true),StructField(estatura,IntegerType,true),StructField(nombre,StringType,true)))

In [14]:
# Lectura de los datos asignando el nuevo esquema
df2 = spark.read.json('personas.json', schema=schema_nuevo)

# Mostrar el nuevo esquema
df2.printSchema()

#df2.show()
#df2.describe().show()

root
 |-- edad: string (nullable = true)
 |-- estatura: integer (nullable = true)
 |-- nombre: string (nullable = true)



### Creación de nuevas columnas

In [15]:
# Cambiar nombre a una columna
df3 = df.withColumnRenamed('estatura', 'altura')
df3.show()

+----+------+------+
|edad|altura|nombre|
+----+------+------+
|null|  null|  Juan|
|  30|   175|Carlos|
|  25|  null| Pedro|
|null|   162|Teresa|
+----+------+------+



In [16]:
# Crear una nueva columna (llamada "edadx2")
df.withColumn('edadx2', df['edad']*2).show()
df.show()

+----+--------+------+------+
|edad|estatura|nombre|edadx2|
+----+--------+------+------+
|null|    null|  Juan|  null|
|  30|     175|Carlos|    60|
|  25|    null| Pedro|    50|
|null|     162|Teresa|  null|
+----+--------+------+------+

+----+--------+------+
|edad|estatura|nombre|
+----+--------+------+
|null|    null|  Juan|
|  30|     175|Carlos|
|  25|    null| Pedro|
|null|     162|Teresa|
+----+--------+------+



## Uso de SQL

Para poder usar consultas (query) de SQL se debe primero el  dataframe a una vista temporal usando `createOrReplaceTempView`. Luego sobre esta vista temporal es que se realizará las consultas.

In [17]:
# Crear una "tabla" llamada "personas" con los datos del dataframe  
df.createOrReplaceTempView('personas')

# Usar consultas de SQL sobre la tabla creada
result = spark.sql("SELECT * FROM personas")
result.show()

+----+--------+------+
|edad|estatura|nombre|
+----+--------+------+
|null|    null|  Juan|
|  30|     175|Carlos|
|  25|    null| Pedro|
|null|     162|Teresa|
+----+--------+------+



In [18]:
spark.sql("SELECT * FROM personas WHERE edad=30").show()

+----+--------+------+
|edad|estatura|nombre|
+----+--------+------+
|  30|     175|Carlos|
+----+--------+------+



In [19]:
spark.sql("SELECT * FROM personas WHERE nombre LIKE 'Juan'").show()

+----+--------+------+
|edad|estatura|nombre|
+----+--------+------+
|null|    null|  Juan|
+----+--------+------+



## Conversión de RDD a DataFrame

In [20]:
# Creación de un contexto de Spark
sc = spark.sparkContext
sc

In [21]:
# Lectura de un archivo y manipulación usando RDDs
rdd = sc.textFile("/content/texto1.txt")
rdd.take(2)

['Este es un texto de prueba', 'Utiliza funcionalidades de Spark']

In [22]:
# Contar palabras
rdd2 = rdd.flatMap(lambda x: x.lower().split()).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
rdd2.take(10)

[('este', 1),
 ('prueba', 1),
 ('utiliza', 1),
 ('funcionalidades', 1),
 ('especialmente', 1),
 ('usa', 1),
 ('con', 1),
 ('python', 3),
 ('scala', 1),
 ('lenguajes', 1)]

In [23]:
from pyspark.sql import Row

# Crear un RDD con filas de tipo "Row" que el DataFrame identifica
rdd3 = rdd2.map(lambda x: Row(palabra=x[0], contador=x[1], longitud=len(x[0])))
rdd3.take(5)

[Row(palabra='este', contador=1, longitud=4),
 Row(palabra='prueba', contador=1, longitud=6),
 Row(palabra='utiliza', contador=1, longitud=7),
 Row(palabra='funcionalidades', contador=1, longitud=15),
 Row(palabra='especialmente', contador=1, longitud=13)]

In [24]:
# Crear un DataFrame a partir del RDD
df = spark.createDataFrame(rdd3)
df.show(5)

+---------------+--------+--------+
|        palabra|contador|longitud|
+---------------+--------+--------+
|           este|       1|       4|
|         prueba|       1|       6|
|        utiliza|       1|       7|
|funcionalidades|       1|      15|
|  especialmente|       1|      13|
+---------------+--------+--------+
only showing top 5 rows



In [25]:
# Registrar el Inferir el esquema y registrar el RDD como una tabla
df.createOrReplaceTempView("tabla1")

In [26]:
# Consultas SQL en la tabla
spark.sql("SELECT palabra, longitud FROM tabla1 WHERE contador>=2").show()

+-------+--------+
|palabra|longitud|
+-------+--------+
| python|       6|
|     es|       2|
|     de|       2|
|  spark|       5|
|   usar|       4|
+-------+--------+

