In [17]:
#Imports
from pyspark.sql import SparkSession, functions
from pyspark.sql.functions import to_timestamp, col


In [3]:
#Se crea una sesión, o en caso de que ya exista se recupera
spark_session = SparkSession \
    .builder \
    .appName("Simple program to show the basics of Spark dataframes") \
    .getOrCreate()

spark_session.sparkContext.setLogLevel("ERROR")


In [4]:

#Leemos los datos y le decimos que infiera el schema, es decir los tipos de datos para cada variable
data_frame = spark_session \
    .read \
    .options(header='true', inferschema='true') \
    .option("delimiter", ",") \
    .option("timestampFormat", "yyyy-MM-dd") \
    .csv("C:/Users/cberd/Desktop/Master/Modulo6ProcesamientoDatosEscalable/Tareas/ProyectoPythonSpark/data/personalData.csv") \
    .persist()
  


In [9]:
#Podemos mostrar el schema inferido de la lectura
print("Data frame schema") 
data_frame.printSchema()

Data frame schema
root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Weight: double (nullable = true)
 |-- HasACar: boolean (nullable = true)
 |-- BirthDate: date (nullable = true)



In [11]:
#Se muestra el dataframe, por defecto show muestra solo los 20 primeros
print ("Data frame")
data_frame.show()

Data frame
+------+---+------+-------+----------+
|  Name|Age|Weight|HasACar| BirthDate|
+------+---+------+-------+----------+
|  Luis| 23|  84.5|   true|2019-02-28|
|  Lola| 42|  70.2|  false|2000-10-01|
|  Paco| 66|  90.1|  false|1905-12-03|
|Manolo| 68|  75.3|   true|2000-01-04|
+------+---+------+-------+----------+



In [13]:
#Podemos ver los tipos de cada atirbuto
print("data types: " + str(data_frame.dtypes))

data types: [('Name', 'string'), ('Age', 'int'), ('Weight', 'double'), ('HasACar', 'boolean'), ('BirthDate', 'date')]


In [15]:
#Se describe el dataframe, es decir ver valores estadísticos como la media,máximo, mínimo,...
print ("Describe the dataframe")
data_frame \
    .describe() \
    .show()


Describe the dataframe
+-------+----+------------------+-----------------+
|summary|Name|               Age|           Weight|
+-------+----+------------------+-----------------+
|  count|   4|                 4|                4|
|   mean|null|             49.75|80.02499999999999|
| stddev|null|21.391197566600457|8.951489633947338|
|    min|Lola|                23|             70.2|
|    max|Paco|                68|             90.1|
+-------+----+------------------+-----------------+



In [17]:
#Explica cómo Spark procesará los datos en la memoria y en disco
print ("Explain the dataframe")
data_frame \
    .explain()


Explain the dataframe
== Physical Plan ==
InMemoryTableScan [Name#17, Age#18, Weight#19, HasACar#20, BirthDate#21]
   +- InMemoryRelation [Name#17, Age#18, Weight#19, HasACar#20, BirthDate#21], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- FileScan csv [Name#17,Age#18,Weight#19,HasACar#20,BirthDate#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/cberd/Desktop/Master/Modulo6ProcesamientoDatosEscalable..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Name:string,Age:int,Weight:double,HasACar:boolean,BirthDate:date>




In [19]:
# Se transforma el formato de la fecha al formato usado en Spark 3.0
print ("Transform the date to the format used in Spark 3.0")
data_frame = data_frame.withColumn("BirthDate", to_timestamp("BirthDate", "yyyy-MM-dd"))

# Se muestra el resultado
data_frame.printSchema()
data_frame.show()

Transform the date to the format used in Spark 3.0
root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Weight: double (nullable = true)
 |-- HasACar: boolean (nullable = true)
 |-- BirthDate: timestamp (nullable = true)

+------+---+------+-------+-------------------+
|  Name|Age|Weight|HasACar|          BirthDate|
+------+---+------+-------+-------------------+
|  Luis| 23|  84.5|   true|2019-02-28 00:00:00|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|
+------+---+------+-------+-------------------+



In [21]:
# Se pueden realizar operaciones de data frames comunes (select,group by, count, order by, where, join,...)


In [23]:
#Por ejemplo seleccionamos la columna de nombre y edad
print ("Select the name and age columns")
data_frame.select("Name", "Age").show()

Select the name and age columns
+------+---+
|  Name|Age|
+------+---+
|  Luis| 23|
|  Lola| 42|
|  Paco| 66|
|Manolo| 68|
+------+---+



In [25]:
 # Podríamos también hacer operaciones en las columnas que seleccionamos y al mostrarlas por consola nos indicaría la operación realizada
print ("Select columns name and age, but adding 1 to age")
data_frame.select("Name", data_frame["Age"] + 1) \
    .show()

Select columns name and age, but adding 1 to age
+------+---------+
|  Name|(Age + 1)|
+------+---------+
|  Luis|       24|
|  Lola|       43|
|  Paco|       67|
|Manolo|       69|
+------+---------+



In [29]:
#Ejemplo que indica para cada fila si el nombre es mayor que 4
print ("Indicates whether the rows have a name length > 4")
data_frame.select(functions.length(data_frame["Name"]) > 4).show()


Indicates whether the rows have a name length > 4
+------------------+
|(length(Name) > 4)|
+------------------+
|             false|
|             false|
|             false|
|              true|
+------------------+



In [31]:
#De forma similar podríamos ver si el nombre empieza por L
print ("Indicates whether the names start with L")
data_frame.select(data_frame["name"], data_frame["name"].startswith("L")) \
    .show()


Indicates whether the names start with L
+------+-------------------+
|  name|startswith(name, L)|
+------+-------------------+
|  Luis|               true|
|  Lola|               true|
|  Paco|              false|
|Manolo|              false|
+------+-------------------+



In [None]:
"""
También podemos editar el nombre de columnas, eliminarlas, añadir columnas y 
por supuesto añadir condiciones para ejecutar estas operaciones. 
Por ejemplo:
"""

In [33]:
#Añadir una nueva columna donde el resultado sea true si la edad es mayor que 45
print ("Add a new column Senior containing true if the person age is > 45")
data_frame.withColumn("Senior", data_frame["Age"] > 45) \
    .show()


Add a new column Senior containing true if the person age is > 45
+------+---+------+-------+-------------------+------+
|  Name|Age|Weight|HasACar|          BirthDate|Senior|
+------+---+------+-------+-------------------+------+
|  Luis| 23|  84.5|   true|2019-02-28 00:00:00| false|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00| false|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|  true|
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|  true|
+------+---+------+-------+-------------------+------+



In [35]:
# Renombrar la columna HasACar
print ("Rename column HasACar as Owner")
data_frame.withColumnRenamed("HasACar", "Owner") \
    .show()

Rename column HasACar as Owner
+------+---+------+-----+-------------------+
|  Name|Age|Weight|Owner|          BirthDate|
+------+---+------+-----+-------------------+
|  Luis| 23|  84.5| true|2019-02-28 00:00:00|
|  Lola| 42|  70.2|false|2000-10-01 00:00:00|
|  Paco| 66|  90.1|false|1905-12-03 00:00:00|
|Manolo| 68|  75.3| true|2000-01-04 00:00:00|
+------+---+------+-----+-------------------+



In [37]:
#Eliminar una columna
print ("Remove column DateBirth")
data_frame.drop("BirthDate") \
    .show()

Remove column DateBirth
+------+---+------+-------+
|  Name|Age|Weight|HasACar|
+------+---+------+-------+
|  Luis| 23|  84.5|   true|
|  Lola| 42|  70.2|  false|
|  Paco| 66|  90.1|  false|
|Manolo| 68|  75.3|   true|
+------+---+------+-------+



In [39]:
#Siguiendo con los ejemplos de operaciones, podemos hacer un sort. Como vemos es posible hacerlo usando un sort o un orderBy
print ("Sort by age (descending)")
#Ambos sorts son equivalentes aunque con simtaxis distintas
data_frame.sort(data_frame.Age.desc()).show() #por fecha descendente
data_frame.sort("Age", ascending=False).show() #por fecha descendente

data_frame.orderBy(["Age", "Weight"], ascending=[0, 1]).show() #se ordena primero por fecha descendentemente y en caso de empate por peso ascendentemente


Sort by age (descending)
+------+---+------+-------+-------------------+
|  Name|Age|Weight|HasACar|          BirthDate|
+------+---+------+-------+-------------------+
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00|
|  Luis| 23|  84.5|   true|2019-02-28 00:00:00|
+------+---+------+-------+-------------------+

+------+---+------+-------+-------------------+
|  Name|Age|Weight|HasACar|          BirthDate|
+------+---+------+-------+-------------------+
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00|
|  Luis| 23|  84.5|   true|2019-02-28 00:00:00|
+------+---+------+-------+-------------------+

+------+---+------+-------+-------------------+
|  Name|Age|Weight|HasACar|          BirthDate|
+------+---+------+-------+-------------------+
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|
|  Paco| 66| 

In [None]:
#Podriamos convertir a RDD

In [51]:
# Get a RDD
rdd_from_dataframe = data_frame \
    .rdd \
    .persist()




In [53]:
"""
Podríamos realizar varias operaciones de forma equivalente usando rdd y data frames. De forma general leer usando dataframes el archivo
puede ser algo mas lento, ya que está infiriendo los tipos y el schema pero luego las operaciones, filtros, reordenaciones suelen ser más rápida.
Como se supone que el adecuado uso de spark conlleva pocas lecturas, podría ser interesante usar data frames. Además aporta mas legibilidad.
"""

'\nPodríamos realizar varias operaciones de forma equivalente usando rdd y data frames. De forma general leer usando dataframes el archivo\npuede ser algo mas lento, ya que está infiriendo los tipos y el schema pero luego las operaciones, filtros, reordenaciones suelen ser más rápida.\nComo se supone que el adecuado uso de spark conlleva pocas lecturas, podría ser interesante usar data frames. Además aporta mas legibilidad.\n'

In [None]:
#Suma de todos los pesos RDD VS DF

In [None]:
# Suma de pesos (RDD)
sum_of_weights = rdd_from_dataframe \
    .map(lambda row: row[2]) \
    .reduce(lambda x, y: x + y)  # sum()
print("Sum of weights (RDDs): " + str(sum_of_weights))




In [65]:
#Usando un dataframe podemos calcularlo de muchas maneras. Por ejemplo:
weights = data_frame \
    .select("Weight") \
    .groupBy() \
    .sum() \
    .collect()

print(weights)

print("Sum of weights (dataframe): " + str(weights[0][0]))

data_frame.select(functions.sum(data_frame["Weight"])).show()
data_frame.agg({"Weight": "sum", "Age": "min"}).show()


[Row(sum(Weight)=320.09999999999997)]
Sum of weights (dataframe): 320.09999999999997
+------------------+
|       sum(Weight)|
+------------------+
|320.09999999999997|
+------------------+

+------------------+--------+
|       sum(Weight)|min(Age)|
+------------------+--------+
|320.09999999999997|      23|
+------------------+--------+



In [None]:
#Edad media RDD VS DF

In [None]:

# Media con RDD
total_age = rdd_from_dataframe \
    .map(lambda row: row[1]) \
    .reduce(lambda x, y: x + y)

mean_age = total_age / rdd_from_dataframe.count()
print("Mean age (RDDs): " + str(mean_age))



In [71]:

# Media con Dataframe, dos ejemplos:
data_frame.select(functions.avg(data_frame["Weight"])) \
    .withColumnRenamed("avg(Weight)", "Average") \
    .show()

data_frame.agg({"Weight": "avg"}).show()

+-----------------+
|          Average|
+-----------------+
|80.02499999999999|
+-----------------+

+-----------------+
|      avg(Weight)|
+-----------------+
|80.02499999999999|
+-----------------+



In [73]:
#Otro ejemplo contar las personas según si tienen coche o no
data_frame.groupBy("HasACar")\
    .count()\
    .show()

+-------+-----+
|HasACar|count|
+-------+-----+
|   true|    2|
|  false|    2|
+-------+-----+



In [19]:
#Como último ejemplo podríamos ver que nombres son únicos:


# Contar cuántas veces aparece cada nombre
name_counts = data_frame.groupBy("Name").count()

# Filtrar solo los nombres que aparecen exactamente una vez
unique_names = name_counts.filter(col("count") == 1)

# Mostrar los nombres únicos
print("Nombres únicos en el dataset (aparecen solo una vez):")
unique_names.show()





Nombres únicos en el dataset (aparecen solo una vez):
+------+-----+
|  Name|count|
+------+-----+
|  Lola|    1|
|  Paco|    1|
|  Luis|    1|
|Manolo|    1|
+------+-----+



In [None]:
#Podemos escribir el resultado en un archivo JSON O CSV
# Write to a json file
data_frame\
    .write \
    .mode("overwrite") \
    .save("output.json", format="json")

# Write to a CSV file
data_frame\
    .write\
    .format("csv")\
    .option("header", "true") \
    .mode("overwrite")\
    .save("output.csv")

