# Basic Operations with DATA FRAME

## Primeros comando e iniciación

Primero, se importa desde pyspark.sql tanto el constructor como algunas funciones propias de la libreria.

In [1]:
from pyspark.sql import SparkSession, functions


Se inicia el constructor de spark haciendo uso de 4 núcleos para la computación de los comandos.

In [4]:
spark_session = SparkSession \
        .builder \
        .master("local[4]") \
        .getOrCreate()

logger = spark_session._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)


Se carga el conjunto de datos en formato CSV en una variable llamada data_frame. El delimitador será la coma ( , ) y con el comando persist se quedará guardado memoria.

In [7]:
data_frame = spark_session \
        .read \
        .options(header='true', inferschema='true') \
        .option("delimiter", ",") \
        .csv("/home/master/Descargas/basicOperationData.csv") \
        .persist()

A continuación, se imprime los tipos de variables que contiene cada columna del data frame (con su titulo de columna) y se imprime el data frame completo.

In [8]:
data_frame.printSchema()
data_frame.show()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Weight: double (nullable = true)
 |-- HasACar: boolean (nullable = true)
 |-- BirthDate: timestamp (nullable = true)

+------+---+------+-------+-------------------+
|  Name|Age|Weight|HasACar|          BirthDate|
+------+---+------+-------+-------------------+
|  Luis| 23|  84.5|   true|2019-03-01 00:00:00|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|
+------+---+------+-------+-------------------+



Otra forma de ver los tipos de variable de las columnas del data frame es la siguiente:

In [9]:
print("data types: " + str(data_frame.dtypes))

data types: [('Name', 'string'), ('Age', 'int'), ('Weight', 'double'), ('HasACar', 'boolean'), ('BirthDate', 'timestamp')]


Se puede hacer un pre-análisis mediante el comando describe. EL cual muestra la cantidad de filas de cada columna, la media (si la columna  es numérica), la desviación standard, el mínimo y el máximo de cada columna.

In [11]:
data_frame.describe().show()

+-------+----+------------------+-----------------+
|summary|Name|               Age|           Weight|
+-------+----+------------------+-----------------+
|  count|   4|                 4|                4|
|   mean|null|             49.75|80.02499999999999|
| stddev|null|21.391197566600457|8.951489633947338|
|    min|Lola|                23|             70.2|
|    max|Paco|                68|             90.1|
+-------+----+------------------+-----------------+



In [12]:
data_frame.explain()

== Physical Plan ==
InMemoryTableScan [Name#10, Age#11, Weight#12, HasACar#13, BirthDate#14]
   +- InMemoryRelation [Name#10, Age#11, Weight#12, HasACar#13, BirthDate#14], StorageLevel(disk, memory, 1 replicas)
         +- *(1) FileScan csv [Name#10,Age#11,Weight#12,HasACar#13,BirthDate#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/master/Descargas/basicOperationData.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Name:string,Age:int,Weight:double,HasACar:boolean,BirthDate:timestamp>


## Comandos para acceder a un data frame

Para acceder/seleccionar toda las filas de la columna "Name":

In [13]:
data_frame.select("Name").show()

+------+
|  Name|
+------+
|  Luis|
|  Lola|
|  Paco|
|Manolo|
+------+



Se selecciona todo de la columna "Name" y "Age", además de sumar 1 a la edad:

In [15]:
data_frame.select("Name", data_frame["Age"]).show()
data_frame.select("Name", data_frame["Age"] + 1).show()

+------+---+
|  Name|Age|
+------+---+
|  Luis| 23|
|  Lola| 42|
|  Paco| 66|
|Manolo| 68|
+------+---+

+------+---------+
|  Name|(Age + 1)|
+------+---------+
|  Luis|       24|
|  Lola|       43|
|  Paco|       67|
|Manolo|       69|
+------+---------+



Para seleccionar aquellas filas que cumplan una condición. En este caso aquellas cuyo nombre tenga más de 4 letras:

In [17]:
data_frame.select("Name").show()
data_frame.select(functions.length(data_frame["Name"]) > 4).show()

+------+
|  Name|
+------+
|  Luis|
|  Lola|
|  Paco|
|Manolo|
+------+

+------------------+
|(length(Name) > 4)|
+------------------+
|             false|
|             false|
|             false|
|              true|
+------------------+



Si se quiere escoger aquellos nombres que empiezan por la letra "L":

In [18]:
data_frame.select("name", data_frame["name"].startswith("L")).show()

+------+-------------------+
|  name|startswith(name, L)|
+------+-------------------+
|  Luis|               true|
|  Lola|               true|
|  Paco|              false|
|Manolo|              false|
+------+-------------------+



## Comandos para manipulación y modificacion del data frame

Para añadir una nueva columna al dataset llamada "Senior" pero que cumpla la condicion de que sea solamente aquellas personas cuya edad supere los 45 años:

In [19]:
data_frame.withColumn("Senior", data_frame["Age"] > 45).show()

+------+---+------+-------+-------------------+------+
|  Name|Age|Weight|HasACar|          BirthDate|Senior|
+------+---+------+-------+-------------------+------+
|  Luis| 23|  84.5|   true|2019-03-01 00:00:00| false|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00| false|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|  true|
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|  true|
+------+---+------+-------+-------------------+------+



Para modificar el título de una columna:

In [21]:
data_frame.withColumnRenamed("HasACar", "Owner") \
        .show()

+------+---+------+-----+-------------------+
|  Name|Age|Weight|Owner|          BirthDate|
+------+---+------+-----+-------------------+
|  Luis| 23|  84.5| true|2019-03-01 00:00:00|
|  Lola| 42|  70.2|false|2000-10-01 00:00:00|
|  Paco| 66|  90.1|false|1905-12-03 00:00:00|
|Manolo| 68|  75.3| true|2000-01-04 00:00:00|
+------+---+------+-----+-------------------+



Si se quiere eliminar la columna llamada "BirthDate":

In [22]:
data_frame.drop("BirthDate") \
        .show()

+------+---+------+-------+
|  Name|Age|Weight|HasACar|
+------+---+------+-------+
|  Luis| 23|  84.5|   true|
|  Lola| 42|  70.2|  false|
|  Paco| 66|  90.1|  false|
|Manolo| 68|  75.3|   true|
+------+---+------+-------+



Si se quiere ordenar una columna:

In [24]:
data_frame.sort(data_frame.Age.desc()).show()

+------+---+------+-------+-------------------+
|  Name|Age|Weight|HasACar|          BirthDate|
+------+---+------+-------+-------------------+
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00|
|  Luis| 23|  84.5|   true|2019-03-01 00:00:00|
+------+---+------+-------+-------------------+



In [25]:
data_frame.sort("Age", ascending=False).show()

+------+---+------+-------+-------------------+
|  Name|Age|Weight|HasACar|          BirthDate|
+------+---+------+-------+-------------------+
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00|
|  Luis| 23|  84.5|   true|2019-03-01 00:00:00|
+------+---+------+-------+-------------------+



Para ordenar la edad de forma descendente y el peso de forma ascendente:

In [27]:
data_frame.orderBy(["Age", "Weight"], ascending=[0, 1]).show()

+------+---+------+-------+-------------------+
|  Name|Age|Weight|HasACar|          BirthDate|
+------+---+------+-------+-------------------+
|Manolo| 68|  75.3|   true|2000-01-04 00:00:00|
|  Paco| 66|  90.1|  false|1905-12-03 00:00:00|
|  Lola| 42|  70.2|  false|2000-10-01 00:00:00|
|  Luis| 23|  84.5|   true|2019-03-01 00:00:00|
+------+---+------+-------+-------------------+








Si se desea obtener un RDD a partir de una data frame:

In [26]:
rdd_from_dataframe = data_frame \
        .rdd \
        .persist()

for i in rdd_from_dataframe.collect():
    print(i)

Row(Name='Luis', Age=23, Weight=84.5, HasACar=True, BirthDate=datetime.datetime(2019, 3, 1, 0, 0))
Row(Name='Lola', Age=42, Weight=70.2, HasACar=False, BirthDate=datetime.datetime(2000, 10, 1, 0, 0))
Row(Name='Paco', Age=66, Weight=90.1, HasACar=False, BirthDate=datetime.datetime(1905, 12, 3, 0, 0))
Row(Name='Manolo', Age=68, Weight=75.3, HasACar=True, BirthDate=datetime.datetime(2000, 1, 4, 0, 0))


## Comandos para operar en un data frame

Para sumar todos los pesos (desde el RDD anteriormente creado):

In [28]:
sum_of_weights = rdd_from_dataframe \
        .map(lambda row: row[2]) \
        .reduce(lambda x, y: x + y)  # sum()
print("Sum of weights (RDDs): " + str(sum_of_weights))

Sum of weights (RDDs): 320.09999999999997


Para sumar todos los pesos pero a partir del data frame (dos formas distintas):

In [29]:
weights = data_frame \
        .select("Weight") \
        .groupBy() \
        .sum() \
        .collect()

print(weights)
print("Sum of weights (dataframe): " + str(weights[0][0]))

[Row(sum(Weight)=320.09999999999997)]
Sum of weights (dataframe): 320.09999999999997


Para el cálculo de la media de la edad(RDD):

In [31]:
total_age = rdd_from_dataframe \
        .map(lambda row: row[1]) \
        .reduce(lambda x, y: x + y)

mean_age = total_age / rdd_from_dataframe.count()

print("Mean age (RDDs): " + str(mean_age))

Mean age (RDDs): 49.75


Para el cálculo de la media de la edad (data frame) de dos formas distintas:

In [32]:
data_frame.select(functions.avg(data_frame["Weight"])) \
        .withColumnRenamed("avg(Weight)", "Average") \
        .show()

data_frame.agg({"Weight": "avg"}).show()

+-----------------+
|          Average|
+-----------------+
|80.02499999999999|
+-----------------+

+-----------------+
|      avg(Weight)|
+-----------------+
|80.02499999999999|
+-----------------+



## Exportar a formato JSON o CSV los resultados obtenidos

In [None]:
# Escribir a JSON
    data_frame\
        .write\
        .save("output.json", format="json")

In [None]:
# Escribir a CSV
    data_frame\
        .write\
        .format("csv")\
        .save("output.csv")