In [None]:
import os

os.environ['JAVA_HOME'] = "C:/Program Files/Java/jdk-11"
os.environ['PYSPARK_PYTHON'] = "C:/Users/usr/anaconda3/envs/pyspark_env/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = "C:/Users/usr/anaconda3/envs/pyspark_env/python.exe"
os.environ['HADOOP_HOME'] = "C:/hadoop-3.4.0"
os.environ['HADOOP_COMMON_LIB_NATIVE_DIR'] = "C:/hadoop-3.4.0/lib/native"
os.environ['PATH'] += os.pathsep + "C:/hadoop-3.4.0/bin"

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

### **Crear un DataFrame a partir de un RDD**

In [None]:
rdd = sc.parallelize([item for item in range(10)]).map(lambda x: (x, x ** 2)) # Queremos que nos devuelva tuplas.

In [None]:
rdd.collect()

In [None]:
df = rdd.toDF(['numero', 'cuadrado']) # Le damos una lista con los nombres de columnas que queremos en el DF.

In [None]:
df.printSchema()

- long es el tipo de dato.

- nullable es si acepta nulos o no.

In [None]:
df.show()

- Crear un DF a partir de un RDD con schema.

In [None]:
rdd1 = sc.parallelize([(1, 'Jose', 35.5), (2, 'Teresa', 54.3), (3, 'Katia', 12.7)])

- 2 formas para crear el schema.

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

- Vía uno.

In [None]:
esquema1 = StructType(
    [
        StructField('id', IntegerType(), True), # Nombre de la columna, tipo de dato, si admite nulos o no.
        StructField('nombre', StringType(), True),
        StructField('saldo', DoubleType(), True)
    ]
)

Vía dos.

In [None]:
esquema2 = "`id` INT, `nombre` STRING, `saldo` DOUBLE"

In [None]:
df1 = spark.createDataFrame(rdd1, schema = esquema1)

In [None]:
df1.printSchema()

In [None]:
df1.show() # Muestra por default las primeras 20 filas del DF.

In [None]:
df2 = spark.createDataFrame(rdd1, schema = esquema2)

In [None]:
df2.printSchema()

In [None]:
df2.show()

### **Crear un DF a partir de fuentes de datos**

- format() no es opcional, option() y schema() si.

- Crear un DF a partir de un **archivo** de **texto**.

In [None]:
df3 = spark.read.text('./data/data/dataTXT.txt')

In [None]:
df3.show()

In [None]:
df3.show(truncate = False) # Para que muestre todo el texto.

- Crear un DF a partir de un **CSV**.

In [None]:
df4 = spark.read.csv('./data/data/dataCSV.csv')

In [None]:
df4.show()

- El nombre de las columnas lo ha generado pero en realidad el nombre de las columnas debería ser la primera fila.

In [None]:
df4 = spark.read.option('header', 'true').csv('./data/data/dataCSV.csv') # Le decimos que el título del campo es la primera fila.

In [None]:
df4.show()

- Leer archivo de texto con un **delimitador diferente**.

In [None]:
df5 = spark.read.option('header', 'true').option('delimiter', '|').csv('./data/data/dataTab.txt')

In [None]:
df5.show()

- El delimiter es que en este caso el archivo de texto no estaban los datos delimitados por comas sino por '|'

- Crear un DF a partir de un **JSON** proporcionando un **schema**.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

In [None]:
json_schema = StructType(
    [
        StructField('color', StringType(), True),
        StructField('edad', IntegerType(), True),
        StructField('fecha', DateType(), True),
        StructField('pais', StringType(), True)
    ]
)

In [None]:
df6 = spark.read.schema(json_schema).json('./data/data/dataJSON.json')

In [None]:
df6.show()

In [None]:
df6.printSchema()

- Crear un DF a partir de un archivo **parquet**.

In [None]:
df7 = spark.read.parquet('./data/data/dataPARQUET.parquet')

In [None]:
df7.show()

- Tenemos otra opción para la lectura.

In [None]:
df8 = spark.read.format('parquet').load('./data/data/dataPARQUET.parquet')

In [None]:
df8.printSchema()

### **Trabajo con columnas**

- Al igual que en las operaciones con RDD, las operaciones estructuradas (con DF y SQL) tienen 2 categorías: **transformación** y **acción**.

- Los **DF** son **inmutables** y sus operaciones de transformación siempre devuelven un **DF nuevo**.

In [None]:
df9 = spark.read.parquet('./data/data/dataPARQUET.parquet')

In [None]:
df9.printSchema()

- **Primera** alternativa para **referirnos** a las columnas.

In [None]:
df9.select('title').show()

- **Segunda** alternativa.

In [None]:
from pyspark.sql.functions import col

In [None]:
df9.select(col('title')).show()

### **Transformaciones: funciones select() y selectExpr()**

- **select()**

In [None]:
df10 = spark.read.parquet('./data/data/dataPARQUET.parquet')

In [None]:
df10.printSchema()

In [None]:
from pyspark.sql.functions import col

In [None]:
df10.select(col('video_id')).show()

In [None]:
df10.select('video_id', 'trending_date').show() # La otra forma.

- select() presenta la desventaja de que no puedo construir expresiones dentro. Por ejemplo esto dará error:

In [None]:
# df10.select(
#     'likes',
#     'dislikes',
#     ('likes' - 'dislikes') # Una nueva columna que sea la resta de likes y dislikes.
# )

- Tenemos que hacerlo con 'col'

In [None]:
df10.select(
    col('likes'),
    col('dislikes'),
    (col('likes') - col('dislikes')).alias('aceptacion')
).show()

- **selectExpr**

In [None]:
df10.selectExpr('likes', 'dislikes', '(likes - dislikes) as aceptacion').show() # La otra forma.

In [None]:
df10.selectExpr('count(distinct (video_id)) as videos').show()

### **Transformaciones: funciones filter() y where()**

In [None]:
df11 = spark.read.parquet('./data/data/dataPARQUET.parquet')

- **filter()**

In [None]:
from pyspark.sql.functions import col

In [None]:
df11.show()

In [None]:
df11.filter(col('video_id') == '2kyS6SvSYSE').show() # Nos trae las filas que coinciden con ese valor de video_id.

- **where()**

- Es muy similar a filter()

In [None]:
df12 = spark.read.parquet('./data/data/dataPARQUET.parquet').where(col('trending_date') != '17.14.11')

In [None]:
df12.show()

In [None]:
df13 = spark.read.parquet('./data/data/dataPARQUET.parquet').where(col('likes') > 5000)

In [None]:
df13.filter((col('trending_date') != '17.14.11') & (col('likes') > 7000)).show()

- También podríamos haberlo hecho de esta otra forma.

In [None]:
df13.filter(col('trending_date') != '17.14.11').filter(col('likes') > 7000).show()

### **Transformaciones: funciones distinct() y dropDuplicates()**

- Tienen la misma función pero dropDuplicates() nos permite definir la lógica.

In [None]:
df14 = spark.read.parquet('./data/data/dataPARQUET.parquet')

- **distinct()**

In [None]:
df_sin_duplicados = df14.distinct() # Elimina los duplicados.

In [None]:
print('El conteo del dataframe original es {}'.format(df14.count()))
print('El conteo del dataframe sin duplicados es {}'.format(df_sin_duplicados.count()))

- **dropDuplicates()**

- También podríamos llamarlo sin llamar ningún parámetro, sería lo mismo que distinct()

In [None]:
dataframe = spark.createDataFrame([(1, 'azul', 567), (2, 'rojo', 487), (1, 'azul', 345), (2, 'verde', 783)]).toDF('id', 'color', 'importe')

In [None]:
dataframe.show()

In [None]:
dataframe.dropDuplicates(['id', 'color']).show() # Aquellas filas en estas columnas que tienen valores iguales.

### **Transformaciones: funciones sort(), orderBy() y limit()**

In [None]:
from pyspark.sql.functions import col

In [None]:
df15 = spark.read.parquet('./data/data/dataPARQUET.parquet')

In [None]:
df15 = df15.select(col('likes'), col('views'), col('video_id'), col('dislikes')).dropDuplicates(['video_id'])

In [None]:
df15.show()

- **sort()**

In [None]:
df15.sort('likes').show()

In [None]:
from pyspark.sql.functions import desc

In [None]:
df15.sort(desc('likes')).show()

- **orderBy()**

In [None]:
df15.orderBy(col('views')).show()

In [None]:
df15.orderBy(col('views').desc()).show()

In [None]:
d16 = spark.createDataFrame([(1, 'azul', 568), (2, 'rojo', 235), (1, 'azul', 456), (2, 'azul', 783)]).toDF('id', 'color', 'importe')

In [None]:
d16.show()

In [None]:
d16.orderBy(col('color').desc(), col('importe')).show()

- **limit()**

In [None]:
top_10 = df15.orderBy(col('views').desc()).limit(10)

In [None]:
top_10.show()

### **Transformaciones: funciones withColumn() y withColumnRenamed()**

- **withColumn()**

In [None]:
df17 = spark.read.parquet('./data/data/dataPARQUET.parquet')

In [None]:
from pyspark.sql.functions import col

In [None]:
df_valoracion = df17.withColumn('valoracion', col('likes') - col('dislikes'))

In [None]:
df_valoracion.printSchema()

In [None]:
df_valoracion1 = (df17.withColumn('valoracion', (col('likes') - col('dislikes')))
                  .withColumn('res:div', col('valoracion') % 10)
)

In [None]:
df_valoracion1.printSchema()

In [None]:
df_valoracion1.select('likes', 'dislikes', 'valoracion', 'res:div').show()

- **withColumnRenamed()**

In [None]:
df_renombrado = df17.withColumnRenamed('video_id', 'id')

In [None]:
df_renombrado.printSchema()

- Spark no arrojará error si el nombre de la columna original no existe.

In [None]:
df_error = df17.withColumnRenamed('nombre_que_no_existe', 'otro_nombre')

In [None]:
df_error.printSchema()

### **Transformaciones: funciones drop(), sample() y randomSplit()**

In [None]:
df18 = spark.read.parquet('./data/data/dataPARQUET.parquet')

- **drop()**

- Si no existe la columna en el schema, se ignorará.

In [None]:
df18.printSchema()

In [None]:
df_util = df18.drop('comments_disabled')

In [None]:
df_util.printSchema()

In [None]:
df_util = df18.drop('comments_disabled', 'ratings_disabled', 'thumbnail_link')

In [None]:
df_util.printSchema()

In [None]:
df_util = df18.drop('comments_disabled', 'ratings_disabled', 'thumbnail_link', 'cafe')

In [None]:
df_util.printSchema()

- **sample()**

In [None]:
df_muestra = df18.sample(0.8) # El 80% aproximadamente de las filas de DF original.

In [None]:
num_filas = df18.count()
num_filas_muestra = df_muestra.count()

print('El 80% de las filas del dataframe original es {}'.format(num_filas - (num_filas * 0.2)))
print('El número de filas del dataframe muestra es {}'.format(num_filas_muestra))

In [None]:
df_muestra = df18.sample(fraction=0.8, seed=1234)

- con seed tomaremos posteriormente la misma muestra si lo deseamos, especificando la misma seed.

In [None]:
df_muestra = df18.sample(withReplacement=True, fraction=0.8, seed=1234)

- withReplacement permite seleccionar más de una vez la misma fila.

- **randomSplit()**

- Devuelve uno o más DF dependiendo de la cantidad de pesos que le especifiquemos.

In [None]:
train, test = df18.randomSplit([0.8, 0.2], seed=1234) # Si no llega a 1 se normalizará automáticamente para llegar a 1.

### **Trabajo con datos incorrectos o faltantes**

In [None]:
df19 = spark.read.parquet('./data/data/dataPARQUET.parquet')

In [None]:
df19.count()

In [None]:
df19.na.drop().count()

- Otra alternativa.

In [None]:
df19.na.drop('any').count()

Otra alternativa.

In [None]:
df19.dropna().count()

In [None]:
df19.na.drop(subset=['views']).count() # Elimina las filas que tengan valores nulos en la columna 'views'.

In [None]:
df19.na.drop(subset=['views', 'dislikes']).count()

In [None]:
from pyspark.sql.functions import col

In [None]:
df19.orderBy(col('views')).select(col('views'), col('likes'), col('dislikes')).show()

- Imputar un valor a los nulos.

In [None]:
df19.fillna(0).orderBy(col('views')).select(col('views'), col('likes'), col('dislikes')).show()

- Rellenar solo ciertas columnas.

In [None]:
df19.fillna(0, subset = ['likes', 'dislikes']).orderBy(col('views')).select(col('views'), col('likes'), col('dislikes')).show()

### **Acciones sobre un DF en Spark SQL**

- Igual que en el RDD, desencadenarán todas las transformaciones acumuladas en el DAG.

In [None]:
df20 = spark.read.parquet('./data/data/dataPARQUET.parquet')

- **show()**

In [None]:
df20.show()

In [None]:
df20.show(5)

In [None]:
df20.show(5, truncate = False)

- **take()**

In [None]:
df20.take(1)

- **head()**

In [None]:
df20.head(1)

- **collect()**

In [None]:
from pyspark.sql.functions import col

In [None]:
df20.select(col('likes')).collect()

### **Escritura de DF**

In [None]:
df21 = spark.read.parquet('./data/data/dataPARQUET.parquet')

In [None]:
df22 = df21.repartition(2)

In [None]:
# df22.write.format('csv').option('sep', '|').save('./data/output/csv')

- Crea 2 archivos, por las 2 particiones.

- Si queremos que nos devuelva solo un archivo.

In [None]:
# df22.coalesce(1).write.format('csv').option('sep', '|').save('./data/output/csv1')

- También podemos guardar un DF al mismo tiempo que lo particionamos.

In [None]:
df22.printSchema()

In [None]:
df22.select('comments_disabled').distinct().show()

- Limpiamos el DF con los valores que nos interesan.

In [None]:
from pyspark.sql.functions import col

In [None]:
df_limpio = df22.filter(col('comments_disabled').isin('True', 'False'))

In [None]:
# df_limpio.write.partitionBy('comments_disabled').parquet('./data/data/parquet')

- Va a crear tantas carpetas como valores tenga la columna en la cual estamos haciendo el partition()

- En este caso, 'True' y 'False'