## PySpark

**PySpark** es la interfaz de programación de **Python** para el framework de procesamiento distribuido **Apache Spark**.

**Spark** es un motor de procesamiento de datos distribuido y de alto rendimiento que se utiliza para procesar grandes volúmenes de datos de manera escalable y eficiente en clústeres de computadoras.

**PySpark** se utiliza comúnmente para tareas de procesamiento de datos, aprendizaje automático, análisis de datos en tiempo real, y para la construcción de aplicaciones de procesamiento de grandes volúmenes de datos.

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("pyspark_teoria").getOrCreate()
spark

In [None]:
# numero de nucleos
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
cores

### Cargar un df

In [None]:
# Leer un archivo con PySpark
titanic = spark.read.csv(path        = "/kaggle/input/d/danielwtummler/pyspark-teoria-practica/titanic.txt",
                         inferSchema = True, header = True)

In [None]:
titanic

In [None]:
titanic.show(5, truncate = True)

In [None]:
titanic.limit(4).toPandas()

### Data Validation 

In [None]:
titanic.printSchema()

In [None]:
titanic.columns

In [None]:
titanic.describe().toPandas()

In [None]:
titanic.schema["Ticket"].dataType

In [None]:
titanic.select("age", "fare").summary("count", "min", "max", "mean").show()

### Espeficicar dtypes de columnas

In [None]:
from pyspark.sql.types import *

In [None]:
# PySpark reconoce todos como strings

people = spark.read.json(path = "/kaggle/input/d/danielwtummler/pyspark-teoria-practica/people.json")

print(people.printSchema())

people.limit(4).toPandas()

In [None]:
# Cambiamos el dtype de "timestamp" a DateType()

data_schema = list((StructField("name"      , StringType(), True),
                    StructField("email"     , StringType(), True),
                    StructField("city"      , StringType(), True),
                    StructField("mac"       , StringType(), True),
                    StructField("timestamp" ,   DateType(), True),
                    StructField("creditcard", StringType(), True)))

final_struc = StructType(fields = data_schema)

In [None]:
# Leemos el archivo otra vez pero especificando el schema

people = spark.read.json(path   = "/kaggle/input/d/danielwtummler/pyspark-teoria-practica/people.json",
                         schema = final_struc)

In [None]:
people.limit(4).toPandas()

In [None]:
people.printSchema()

### Buscar y Filtrar

In [None]:
from pyspark.sql.types import *

In [None]:
fifa = spark.read.csv(path        = "/kaggle/input/d/danielwtummler/pyspark-teoria-practica/fifa19.csv",
                      inferSchema = True, header = True)

fifa.limit(4).toPandas()

In [None]:
fifa.printSchema()

In [None]:
# Para seleccionar columnas usamos .select y pasamos una lista con las columnas (los corchetes son opcionales)

fifa.select(["Nationality", "Name", "Age", "Photo"]).show(5, truncate = False)

In [None]:
# OrderBy, por defecto ascending = True

fifa.select(["Name", "Age"])\
    .orderBy(fifa["Age"]).show(5)

#fifa.select(["Name", "Age"])\
#    .orderBy(fifa["Age"].asc()).show(5)

In [None]:
# .desc()

fifa.select(["Name", "Age"])\
    .orderBy(fifa["Age"].desc()).show(5)

In [None]:
# Para filtrar por palabras podemos usar .where en conjunto con .like

fifa.select(["Name", "Club"])\
    .where(fifa.Club.like("%Barcelona%")).show(5, truncate = False)

In [None]:
# Podemos utilizar .substr() para hacer "slicing" a una cadena de caracteres

fifa.select("Photo", fifa.Photo.substr(-4, 4)).show(5, truncate = False)

In [None]:
# .isin similar a Pandas

fifa[fifa.Club.isin("FC Barcelona", "Juventus")].limit(5).toPandas()

In [None]:
# .where(), .startswith() y .endswith()
# Nota: los .where van uno detrás de otro.

# fifa.select("Name", "Club").where(fifa.Name.startswith("L")).where(fifa.Name.endswith("i")).show(5)

fifa.select("Name", "Club")                \
    .where(fifa.Name.startswith("L"))      \
    .where(fifa.Name.endswith("i")).show(5)

In [None]:
# df.shape[0]

fifa.count()

In [None]:
# .limit() para seleccionar el número de filas

df3 = fifa.limit(100)
df3.count()

In [None]:
# Nos quedamos con las primeras 5 columnas

col_list = fifa.columns[:5]
df3 = fifa.select(col_list)

In [None]:
# nuevo df
df3.show(5, False)

In [None]:
# .filter(condicion)

fifa.filter("Overall > 50").limit(5).toPandas()

In [None]:
# Podemos usar .filter en conjunto con .select

fifa.filter("Overall > 50").select(["Name", "Age"]).limit(5).toPandas()

In [None]:
# El orden no afecta el output .select .filter

fifa.select(["Name", "Age"]).filter("Overall > 50").limit(5).toPandas()

In [None]:
# Varias condiciones AND & OR

fifa.select(["Name", "Age", "Club"]).filter("Overall > 50 AND Age < 30 AND Club = 'FC Barcelona'").limit(5).toPandas()

In [None]:
fifa.select(["Name", "Age", "Club"]).filter("Club = 'Juventus' OR Club = 'FC Barcelona'").limit(5).toPandas()

In [None]:
# .collect() "transforma" el output a list

result = fifa.filter("Overall > 50")                           \
             .select(["Nationality", "Name", "Age", "Overall"])\
             .orderBy(fifa["Overall"].desc()).collect()

result

In [None]:
# result
print("Mejor jugador Overall>50", result[0][1])

In [None]:
# fifa
print("Mejor jugador Overall>50", fifa[0][1])

In [None]:
# result
print("Peor jugador Overall<50", result[-1][1])

### Manipulacion de DataFrames

In [None]:
from pyspark.sql.functions import *

# concat_ws()

concat = fifa.select(fifa.Name,
                     fifa.Nationality,
                     concat_ws(" ", fifa.Name, fifa.Nationality).alias("Nombre/Nacionalidad"))

concat.show(truncate = False)

In [None]:
concat.rdd.id()

In [None]:
# Nuevo df

videos = spark.read.csv(path = "/kaggle/input/d/danielwtummler/pyspark-teoria-practica/youtubevideos.csv",
                        header = True, inferSchema = True)

videos.limit(3).toPandas()

In [None]:
videos.printSchema()

In [None]:
# Podemos reasignar las columnas usando .withColumn en conjunto con .cast, to_date o to_timestamp

df = videos.withColumn("views"        , videos["views"].cast(IntegerType()))                        \
           .withColumn("likes"        , videos["likes"].cast(IntegerType()))                        \
           .withColumn("dislikes"     , videos["dislikes"].cast(IntegerType()))                     \
           .withColumn("category_id"  , videos["category_id"].cast(IntegerType()))                  \
           .withColumn("trending_date", to_date(videos.trending_date, "yy.dd.mm")) 

In [None]:
df.printSchema()

In [None]:
df.limit(3).toPandas()

In [None]:
# .withColumn() también nos permite crear columnas a partir de otras

df = df.withColumn("publish_time_2", regexp_replace(df.publish_time, "T", " "))
df = df.withColumn("publish_time_2", regexp_replace(df.publish_time_2, "Z", ""))

df.select("publish_time", "publish_time_2").show(5, truncate = False)

In [None]:
# lower()
df.select("title", lower(df.title)).show(5, False)

In [None]:
# when(), puede crear columnas a partir de otras si se cumple cierta condición

df.select("likes",
          "dislikes",
          (when(df.likes > df.dislikes, "Good").when(df.likes < df.dislikes, "Bad").when(df.likes == df.dislikes, "Equal")\
          .otherwise("Undetermined")).alias("Favorability")).show(5)

# otherwise() se usa cuando no se resuelve la condicion, y esto puede suceder, por ejemplo, cuando hay NaN's

In [None]:
# expr

# con expr podemos escribir en sintaxis SQL como queremos la nueva columna

df.select("likes",
          "dislikes",
          expr("CASE WHEN likes > dislikes THEN 'Good' \
                     WHEN dislikes > likes THEN 'Bad'  \
                     ELSE 'Undetermined' END           \
                AS Favorability")).show(5)

In [None]:
# year() y month()
# Esto funciona porque la columna esta en formato DateType()

df.select("trending_date",
          year("trending_date").alias("year"),
          month("trending_date").alias("month")).show(5)

In [None]:
# datediff()
# Esto funciona porque las columnas estan en formato DateType()

df.select("trending_date",
          "publish_time_2",
          datediff(df.publish_time_2, df.trending_date)).show(10, False)

In [None]:
# split()
array = df.select("title",
                  split(df.title, " ").alias("split"))

array.show(5, False)

In [None]:
# array_contains parecido a "in" en python

array.select("split",
             array_contains(array.split, "(HBO)")).show(5, False)

In [None]:
# array_distinct parecido a .unique() en Pandas

array.select("title", array_distinct(array.split)).show(10, False)

In [None]:
# array_remove eliminar un elemento de un array 

array.select("title", array_remove(array.split, "Presidency:")).show(5, False)

### UDF

In [None]:
# Podemos usar funciones para crear nuevas columnas

from pyspark.sql.functions import udf          # user define functions
from pyspark.sql.types import IntegerType

In [None]:
# El retorno de lambda 

def square(x):
    return int(x**2)

square_udf = udf(f          = lambda x : square(x),
                 returnType = IntegerType())

df.select("dislikes",
          square_udf("dislikes").alias("dislikes**2")).where(col("dislikes").isNotNull()).show(5)

In [None]:
# Si ejecutamos sin usar .isNotNull() nos dará error porque hay NaN's
# df.select("dislikes", square_udf("dislikes")).show(5)

### Aggregate Functions

In [None]:
# igual que la funcion .groupBy() y .agg() de pandas

fifa.groupBy("Club", "Nationality").agg({"ID" : "count"}).show(1_000, truncate = False)

In [None]:
import pandas as pd

df_fifa = pd.read_csv(filepath_or_buffer = "/kaggle/input/d/danielwtummler/pyspark-teoria-practica/fifa19.csv")

df_fifa.groupby(["Club", "Nationality"]).agg({"ID" : "count"})

In [None]:
# Con esta notación podemos agregar .alias a las columnas

fifa.groupBy("Club").agg(min(fifa.Age).alias("Min Age"),
                         max(fifa.Age).alias("Max Age")).show()

In [None]:
# Con .summary() podemos obtener un resultado similar

videos.select("views", "likes", "dislikes")                                      \
      .summary("count", "min", "25%", "75%", "max", "stddev").limit(6).toPandas()

### Joins

In [None]:
titanic1 = spark.read.csv(path = "/kaggle/input/d/danielwtummler/pyspark-teoria-practica/titanic 1.csv",
                          inferSchema = True, header = True)

titanic2 = spark.read.csv(path = "/kaggle/input/d/danielwtummler/pyspark-teoria-practica/titanic 2.csv",
                          inferSchema = True, header = True)

In [None]:
titanic1.limit(3).toPandas()

In [None]:
titanic2.limit(3).toPandas()

In [None]:
# .union funciona como pd.concat, solo funciona para axis = 0
# Los dfs deben tener la misma cantidad de columnas para funcionar
# Agrega las filas

titanic = titanic1.union(titanic1)

print(titanic1.count())
print(titanic.count())

In [None]:
# Inner Joins
titanic = titanic1.join(other = titanic2, on = ["PassengerId"], how = "inner")

titanic.show()

### Missing Values

In [None]:
# Filtramos con isNull()

titanic.select(["Name", "PassengerId", "Age"]).filter(titanic.Age.isNull()).show(5)

In [None]:
# Con esta funcion podemos contar cuantas filas tienen NaN's

from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = list()
    numRows = df.count()
    
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        
        if (nullRows > 0):
            temp = k, nullRows, (nullRows / numRows)*100
            null_columns_counts.append(temp)
            
    return null_columns_counts

null_columns_calc_list = null_value_calc(titanic)

null_columns_calc_list

In [None]:
spark.createDataFrame(data = null_columns_calc_list,
                      schema = ["Name", "Count", "Percent"]).show()

In [None]:
# df.na.drop() = df.dropna()

titanic.na.drop().limit(6).toPandas()

In [None]:
# .na.drop() sin parametros

og_len = titanic.count()
drop_len = titanic.na.drop().count()

print("Filas eliminadas", og_len - drop_len)
print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

In [None]:
# .na.drop() con threshold = 8

og_len = titanic.count()
drop_len = titanic.na.drop(thresh = 8).count()

print("Filas eliminadas", og_len - drop_len)
print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

In [None]:
# .na.drop() con threshold = 6

og_len = titanic.count()
drop_len = titanic.na.drop(thresh = 6).count()
print("Filas eliminadas", og_len - drop_len)

print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

In [None]:
# .na.drop() podemos elegir por cual columna eliminar las filas

og_len = titanic.count()
drop_len = titanic.na.drop(subset = ["Age"]).count()

print("Filas eliminadas", og_len - drop_len)
print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

In [None]:
# .na.drop() con how = "all" (toda la fila debe tener NaN's)

og_len = titanic.count()
drop_len = titanic.na.drop(how = "all").count()

print("Filas eliminadas", og_len - drop_len)
print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

### Fill NaN's

In [None]:
# na.fill(value), "value" debe coincidir con el dtype de la columna
# Si esto no se cumple, na.fill() no hará nada

titanic.na.fill(value = 9999).limit(6).toPandas()

In [None]:
# fila 6
titanic.na.fill(value = "NO AGE").limit(6).toPandas()

In [None]:
# Podemos hacer fill a una columna especifica

titanic.na.fill(value = 9999, subset = ["Age"]).limit(6).toPandas()

In [None]:
# En una linea

titanic.filter(titanic.Age.isNull()).na.fill(value = 9999, subset = ["Age"]).limit(5).toPandas()

In [None]:
# Cambia los NaN's por el promedio de la columna

def fill_with_mean(df, include = set()):
    stats = df.agg(*(avg(c).alias(c) for c in df.columns if c in include))
    
    return df.na.fill(value = stats.first().asDict())

In [None]:
updated_df = fill_with_mean(titanic, ["Age"])

In [None]:
# fila 6
updated_df.limit(6).toPandas()

In [None]:
################################################################################################################################