# RDD creation

#### [Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

Apache Spark trabaja con un conjunto de datos denominados RDD (Resilient Distributed Dataset o Conjunto de Datos Distribuidos Resistentes), estos poseen una serie de características que los hacen diferenciarse de otros tipos de estructuras de datos:
  + Inmutables: Una vez creados no se pueden modificar.
  + Distribuidos: Hace referencia al RDD, están divididos en particiones que están repartidas por el clúster
  + Resilientes: Esto quiere decir que en el caso de que se pierda una partición, esta se regenara automáticamente.

Los RDD a pesar de ser inmutables pueden ser transformados, de manera que se crean un nuevo RDD y estas transformaciones se aplican a los datos del nuevo RDD.

Existen distintas formas de generar RDDs:
  + A partir de un fichero
  + Distribución de datos desde el driver
  + Transformar un RDD para crear un nuevo RDD.

## Ciclo de vida de un RDD

![ciclo de vida de RDD](https://keepcoding.io/wp-content/uploads/2022/06/image-39-1024x473.png)

# SparkContext

SparkContext o Punto de acceso. 

Para realizar operaciones necesitamos un Context: 
  + SparkContext, SQLContext...

Dependerá del tipo de operación al principio estaba SparkContext y se usaba para operaciones con RDDs, despues salio SparkSession, para RDDs, Dataframes y Datsets. 

SparkSession contempla internamente el SparkContext, HiveContext, SQLContext...

SparkSession nos sirve para todos los contextos.

En principio usar SparkSession sería lo más correcto, ya que establecemos una sesión con el nodo maestro.

## PySpark

**PySpark** es la interfaz de programación de **Python** para el framework de procesamiento distribuido **Apache Spark**.

**Spark** es un motor de procesamiento de datos distribuido y de alto rendimiento que se utiliza para procesar grandes volúmenes de datos de manera escalable y eficiente en clústeres de computadoras.

**PySpark** se utiliza comúnmente para tareas de procesamiento de datos, aprendizaje automático, análisis de datos en tiempo real, y para la construcción de aplicaciones de procesamiento de grandes volúmenes de datos.

_**Usar PySpark en Jupyter:** https://changhsinlee.com/install-pyspark-windows-jupyter/_

In [1]:
#import findspark
#findspark.init()
# CONECTARTE A UNA SESION DE SPARK
import pyspark
from pyspark.sql import SparkSession

**Documentación `PySpark`**: https://spark.apache.org/docs/3.1.1/api/python/reference/index.html

In [2]:
# DECIRLE A PYTHON UTILIZA LAS VARIABLES DE ENTORNO PARA USAR PYSPARK

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
spark = SparkSession.builder.appName("pyspark_teoria").config("spark.cores.max", 4).getOrCreate()
spark

In [4]:
# Verificar la configuración del número de núcleos
configured_cores = spark.conf.get("spark.cores.max")
print("Número de núcleos configurados:", configured_cores)


Número de núcleos configurados: 4


In [5]:
# numero de nucleos
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
cores

1

### Cargar un df

In [7]:
# Leer un archivo con PySpark
# acepta schema = al esquema que tu quieras
titanic = spark.read.csv(path        = "../02.data/titanic.txt",
                         inferSchema = True, header = True)
# inferSchema está establecido en True, Spark intentará inferir automáticamente el esquema de los datos basándose en el contenido de los datos de entrada. En otras palabras, Spark intentará adivinar los tipos de datos de cada columna.
# header está establecido en True, Spark asume que la primera fila del archivo contiene nombres de columna (encabezados), y utilizará esos nombres al construir el esquema del DataFrame.

In [8]:
titanic

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [9]:
# Mostrar tabla = show, truncate = acortar la informacion de las columnas, n = numero filas a mostras
titanic.show(n=5, truncate = False)

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22.0|1    |0    |A/5 21171       |7.25   |NULL |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26.0|0    |0    |STON/O2. 3101282|7.925  |NULL |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35.0|1    |0    |113803          |53

In [10]:
titanic.limit(5).show(truncate = False)

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22.0|1    |0    |A/5 21171       |7.25   |NULL |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26.0|0    |0    |STON/O2. 3101282|7.925  |NULL |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35.0|1    |0    |113803          |53

In [11]:
# Para convertir a pandas hay que usar limit()
titanic.limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [7]:
titanic.show(5, truncate = True)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

Devuelve la primera fila del Dataframe

In [16]:
titanic.first()

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S')

Devuelve 5 filas del DataFrame y las pasamos a Pandas

In [17]:
titanic.limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


### Data Validation 

printSchema nos devuelve el esquema de nuestro dataframe

In [10]:
type(titanic)

pyspark.sql.dataframe.DataFrame

In [12]:
# Muestra el esquema de la tabla titanic
titanic.printSchema()
# double almacena valores numéricos en formato de punto flotante de doble precisión, 64 bits
# float 32 bits

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



columns nos muestra las columas del dataframe en un lista que podremos recorrer

In [12]:
titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

`count()` nos devuelve el número de filas de nuestra tabla de pyspark

In [13]:
titanic.count()

891

`describe()` al igual que Pandas nos devuelve una descripción estadistica de nuestros datos.

In [20]:
titanic.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [22]:
titanic.describe().toPandas().T.rename(columns={0:'count',1:'mean',2:'stddec',3:'min',4:'max'}).drop('summary', axis=0)

Unnamed: 0,count,mean,stddec,min,max
PassengerId,891,446.0,257.3538420152301,1,891
Survived,891,0.3838383838383838,0.4865924542648575,0,1
Pclass,891,2.308641975308642,0.8360712409770491,1,3
Name,891,,,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""","van Melkebeke, Mr. Philemon"
Sex,891,,,female,male
Age,714,29.69911764705882,14.526497332334037,0.42,80.0
SibSp,891,0.5230078563411896,1.1027434322934315,0,8
Parch,891,0.3815937149270482,0.8060572211299488,0,6
Ticket,891,260318.54916792735,471609.26868834975,110152,WE/P 5735
Fare,891,32.2042079685746,49.69342859718089,0.0,512.3292


In [25]:
titanic.toPandas().describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
PassengerId,891.0,446.0,257.353842,1.0,223.5,446.0,668.5,891.0
Survived,891.0,0.383838,0.486592,0.0,0.0,0.0,1.0,1.0
Pclass,891.0,2.308642,0.836071,1.0,2.0,3.0,3.0,3.0
Age,714.0,29.699118,14.526497,0.42,20.125,28.0,38.0,80.0
SibSp,891.0,0.523008,1.102743,0.0,0.0,0.0,1.0,8.0
Parch,891.0,0.381594,0.806057,0.0,0.0,0.0,0.0,6.0
Fare,891.0,32.204208,49.693429,0.0,7.9104,14.4542,31.0,512.3292


`describe()` con Pandas

In [23]:
titanic.toPandas().describe(include='all').T

Unnamed: 0,count,unique,top,freq,mean,std,min,25%,50%,75%,max
PassengerId,891.0,,,,446.0,257.353842,1.0,223.5,446.0,668.5,891.0
Survived,891.0,,,,0.383838,0.486592,0.0,0.0,0.0,1.0,1.0
Pclass,891.0,,,,2.308642,0.836071,1.0,2.0,3.0,3.0,3.0
Name,891.0,891.0,"Braund, Mr. Owen Harris",1.0,,,,,,,
Sex,891.0,2.0,male,577.0,,,,,,,
Age,714.0,,,,29.699118,14.526497,0.42,20.125,28.0,38.0,80.0
SibSp,891.0,,,,0.523008,1.102743,0.0,0.0,0.0,1.0,8.0
Parch,891.0,,,,0.381594,0.806057,0.0,0.0,0.0,0.0,6.0
Ticket,891.0,681.0,347082,7.0,,,,,,,
Fare,891.0,,,,32.204208,49.693429,0.0,7.9104,14.4542,31.0,512.3292


In [38]:
# schema para usar una columna, el dtype es para acceder al tipo de dato
titanic.schema["Ticket"].dataType

StringType()

In [39]:
# Para ver si acepta nulos
titanic.schema["Ticket"].nullable

True

In [40]:
# ver caracteristicas de la columna
titanic.schema["Ticket"]

StructField('Ticket', StringType(), True)

In [41]:
# seleccionar columnas
titanic.select("age", "fare")

DataFrame[age: double, fare: double]

In [50]:
# cosas acepta el sumary
titanic.select("age", "fare").summary("count", "min", "max", "mean", "stddev", "25%", "50%", "75%").show()

+-------+------------------+-----------------+
|summary|               age|             fare|
+-------+------------------+-----------------+
|  count|               714|              891|
|    min|              0.42|              0.0|
|    max|              80.0|         512.3292|
|   mean| 29.69911764705882| 32.2042079685746|
| stddev|14.526497332334035|49.69342859718089|
|    25%|              20.0|           7.8958|
|    50%|              28.0|          14.4542|
|    75%|              38.0|             31.0|
+-------+------------------+-----------------+



In [51]:
# Estadisticas de las columnas
titanic.select("age", "fare").summary().show()

+-------+------------------+-----------------+
|summary|               age|             fare|
+-------+------------------+-----------------+
|  count|               714|              891|
|   mean| 29.69911764705882| 32.2042079685746|
| stddev|14.526497332334035|49.69342859718089|
|    min|              0.42|              0.0|
|    25%|              20.0|           7.8958|
|    50%|              28.0|          14.4542|
|    75%|              38.0|             31.0|
|    max|              80.0|         512.3292|
+-------+------------------+-----------------+



### Espeficicar dtypes de columnas

In [52]:
# importar funciones para cambiar tipo de datos
from pyspark.sql.types import *

In [68]:
# PySpark reconoce todos como strings
# Los archivo json no aceptan inferschema ni header

people = spark.read.json(path = "../02.data/people.json")

print(people.printSchema())

people.limit(4).toPandas()

root
 |-- _corrupt_record: string (nullable = true)
 |-- city: string (nullable = true)
 |-- creditcard: string (nullable = true)
 |-- email: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- name: string (nullable = true)
 |-- timestamp: string (nullable = true)

None


Unnamed: 0,_corrupt_record,city,creditcard,email,mac,name,timestamp
0,[,,,,,,
1,,Lake Gladysberg,1228-1221-1221-1431,katlyn@jenkinsmaggio.net,08:fd:0b:cd:77:f7,Keeley Bosco,2015-04-25 13:57:36 +0700
2,,,1228-1221-1221-1431,juvenal@johnston.name,90:4d:fa:42:63:a2,Rubye Jerde,2015-04-25 09:02:04 +0700
3,,,,,f9:0e:d3:40:cb:e9,Miss Darian Breitenberg,2015-04-25 13:16:03 +0700


In [69]:
# Cambiamos el dtype de "timestamp" a DateType(), imprimira la tabla en el orden que le hayas dado en el esquema

data_schema = list((StructField("timestamp" ,   DateType(), True),
                    StructField("name"      , StringType(), True),
                    StructField("email"     , StringType(), True),
                    StructField("city"      , StringType(), True),
                    StructField("mac"       , StringType(), True),
                    StructField("creditcard", StringType(), True)))

#final_struc = StructType(fields = data_schema)

# Leemos el archivo otra vez pero especificando el schema

people = spark.read.json(path   = "../02.data/people.json",
                         schema = StructType(fields = data_schema))

In [70]:
people.printSchema()

root
 |-- timestamp: date (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- creditcard: string (nullable = true)



In [71]:
people.toPandas()

Unnamed: 0,timestamp,name,email,city,mac,creditcard
0,,,,,,
1,2015-04-25,Keeley Bosco,katlyn@jenkinsmaggio.net,Lake Gladysberg,08:fd:0b:cd:77:f7,1228-1221-1221-1431
2,2015-04-25,Rubye Jerde,juvenal@johnston.name,,90:4d:fa:42:63:a2,1228-1221-1221-1431
3,2015-04-25,Miss Darian Breitenberg,,,f9:0e:d3:40:cb:e9,
4,2015-04-25,Celine Ankunding,emery_kunze@rogahn.net,,3a:af:c9:0b:5c:08,1228-1221-1221-1431
...,...,...,...,...,...,...
9997,2015-04-25,Isobel Reinger,deven@leuschke.org,,65:70:d3:6d:7c:f1,1228-1221-1221-1431
9998,2015-04-25,Chelsea Shields,,,44:1b:7d:5e:c4:da,1211-1221-1234-2201
9999,2015-04-25,Luciano Gutmann,lindsey_kuvalis@lesch.name,,c3:03:c2:65:13:fa,1234-2121-1221-1211
10000,2015-04-25,Austin Langworth,alexys@wilkinsongrady.com,,f5:43:f0:3f:8c:36,1212-1221-1121-1234


### Buscar y Filtrar

In [72]:
# importa funciones de pyspark y sql para buscar y filtrar tablas
from pyspark.sql.functions import *

In [74]:
fifa = spark.read.csv(path        = "../02.data/fifa19.csv",
                      inferSchema = True, header = True)

fifa.limit(4).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M
1,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M
2,2,190871,Neymar Jr,26,https://cdn.sofifa.org/players/4/19/190871.png,Brazil,https://cdn.sofifa.org/flags/54.png,92,93,Paris Saint-Germain,...,94,27,24,33,9,9,15,15,11,€228.1M
3,3,193080,De Gea,27,https://cdn.sofifa.org/players/4/19/193080.png,Spain,https://cdn.sofifa.org/flags/45.png,91,93,Manchester United,...,68,15,21,13,90,85,87,88,94,€138.6M


In [75]:
fifa.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Photo: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Flag: string (nullable = true)
 |-- Overall: integer (nullable = true)
 |-- Potential: integer (nullable = true)
 |-- Club: string (nullable = true)
 |-- Club Logo: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Wage: string (nullable = true)
 |-- Special: integer (nullable = true)
 |-- Preferred Foot: string (nullable = true)
 |-- International Reputation: integer (nullable = true)
 |-- Weak Foot: integer (nullable = true)
 |-- Skill Moves: integer (nullable = true)
 |-- Work Rate: string (nullable = true)
 |-- Body Type: string (nullable = true)
 |-- Real Face: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Jersey Number: integer (nullable = true)
 |-- Joined: string (nullable = true)
 |-- Loaned From: string (nu

In [78]:
# Para seleccionar columnas usamos .select y pasamos una lista con las columnas (los corchetes son opcionales)

fifa.select(["Nationality", "Name", "Age", "Photo"]).show(n=5, truncate = False)

+-----------+-----------------+---+----------------------------------------------+
|Nationality|Name             |Age|Photo                                         |
+-----------+-----------------+---+----------------------------------------------+
|Argentina  |L. Messi         |31 |https://cdn.sofifa.org/players/4/19/158023.png|
|Portugal   |Cristiano Ronaldo|33 |https://cdn.sofifa.org/players/4/19/20801.png |
|Brazil     |Neymar Jr        |26 |https://cdn.sofifa.org/players/4/19/190871.png|
|Spain      |De Gea           |27 |https://cdn.sofifa.org/players/4/19/193080.png|
|Belgium    |K. De Bruyne     |27 |https://cdn.sofifa.org/players/4/19/192985.png|
+-----------+-----------------+---+----------------------------------------------+
only showing top 5 rows



In [79]:
fifa.select("Nationality", "Name", "Age", "Photo").show(n=5, truncate = False)

+-----------+-----------------+---+----------------------------------------------+
|Nationality|Name             |Age|Photo                                         |
+-----------+-----------------+---+----------------------------------------------+
|Argentina  |L. Messi         |31 |https://cdn.sofifa.org/players/4/19/158023.png|
|Portugal   |Cristiano Ronaldo|33 |https://cdn.sofifa.org/players/4/19/20801.png |
|Brazil     |Neymar Jr        |26 |https://cdn.sofifa.org/players/4/19/190871.png|
|Spain      |De Gea           |27 |https://cdn.sofifa.org/players/4/19/193080.png|
|Belgium    |K. De Bruyne     |27 |https://cdn.sofifa.org/players/4/19/192985.png|
+-----------+-----------------+---+----------------------------------------------+
only showing top 5 rows



In [81]:
(
    fifa
    .select(
        col('Nationality').alias('Nacionalidad'), 
        col('Name').alias('Nombre'),
        col('Age').alias('Edad'),
        col('Photo').alias('Fotografía')
    )
).limit(5).toPandas()

Unnamed: 0,Nacionalidad,Nombre,Edad,Fotografía
0,Argentina,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png
1,Portugal,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png
2,Brazil,Neymar Jr,26,https://cdn.sofifa.org/players/4/19/190871.png
3,Spain,De Gea,27,https://cdn.sofifa.org/players/4/19/193080.png
4,Belgium,K. De Bruyne,27,https://cdn.sofifa.org/players/4/19/192985.png


In [82]:
# col para selecciionar la columna a la que le vamos a aplicar cambios
# alias para ponerle un nombre nuevo
(
    fifa
    .select(
        col('Nationality').alias('Nacionalidad'), 
        col('Name').alias('Nombre'),
        col('Age').alias('Edad'),
        col('Photo').alias('Fotografía')
    )
).show(n=5, truncate=False)

+------------+-----------------+----+----------------------------------------------+
|Nacionalidad|Nombre           |Edad|Fotografía                                    |
+------------+-----------------+----+----------------------------------------------+
|Argentina   |L. Messi         |31  |https://cdn.sofifa.org/players/4/19/158023.png|
|Portugal    |Cristiano Ronaldo|33  |https://cdn.sofifa.org/players/4/19/20801.png |
|Brazil      |Neymar Jr        |26  |https://cdn.sofifa.org/players/4/19/190871.png|
|Spain       |De Gea           |27  |https://cdn.sofifa.org/players/4/19/193080.png|
|Belgium     |K. De Bruyne     |27  |https://cdn.sofifa.org/players/4/19/192985.png|
+------------+-----------------+----+----------------------------------------------+
only showing top 5 rows



In [92]:
# OrderBy, por defecto ascending = True

fifa.select(["Name", "Age"])\
    .orderBy(fifa["Age"], ascending=False).show(fifa.count())

+--------------------+---+
|                Name|Age|
+--------------------+---+
|            O. Pérez| 45|
|           T. Warner| 44|
|       K. Pilkington| 44|
|         S. Narazaki| 42|
|           J. Villar| 41|
|        H. Sulaimani| 41|
|            B. Nivet| 41|
|            M. Tyler| 41|
|            C. Muñoz| 41|
|         S. Phillips| 40|
|           G. Buffon| 40|
|         B. Castillo| 40|
|              Hilton| 40|
|            F. Kippe| 40|
|         A. Bizzarri| 40|
|             W. Díaz| 40|
|          S. Bertoli| 40|
|         Y. Nakazawa| 40|
|         S. Nakamura| 40|
|     P. van der Vlag| 40|
|        C. Lucchetti| 40|
|        A. Al Basisi| 40|
|       S. Sorrentino| 39|
|            R. Kawai| 39|
|          P. Guiñazú| 39|
|          M. Caranta| 39|
|          C. Pizarro| 39|
|           Zhou Ting| 39|
|           Cifuentes| 39|
|        M. Ogasawara| 39|
|         D. Dainelli| 39|
|  D. Konstantopoulos| 39|
|       S. Pellissier| 39|
|           D. Bulman| 39|
|

In [106]:
# asc() = ascendente
# desc() = descendente
fifa.select(["Name", "Age"])\
    .orderBy(fifa["age"].asc()).show(fifa.count())

+--------------------+---+
|                Name|Age|
+--------------------+---+
|           B. Nygren| 16|
|         W. Geubbels| 16|
|            A. Doğan| 16|
|            A. Taoui| 16|
|          C. Bassett| 16|
|      Pelayo Morilla| 16|
|            B. Mumba| 16|
|            Guerrero| 16|
|            R. Gómez| 16|
|         H. Massengo| 16|
|        H. Andersson| 16|
|      Y. Verschaeren| 16|
|     P. Samiec-Talar| 16|
|           Y. Roemer| 16|
|         L. D'Arrigo| 16|
|         Y. Begraoui| 16|
|            K. Broda| 16|
|            J. Lahne| 16|
|           J. Olstad| 16|
|         J. Italiano| 16|
|            E. Ceide| 16|
|           S. Steijn| 16|
|            B. Waine| 16|
|         J. Kitolano| 16|
|            L. Smyth| 16|
|          D. Adshead| 16|
|      M. Köstenbauer| 16|
|        A. Mahlonoko| 16|
|      F. Tauchhammer| 16|
|            R. Hauge| 16|
|            M. Tilio| 16|
|          J. Rowland| 16|
|           M. Larsen| 16|
|        J. Imbrechts| 16|
|

In [105]:
fifa.select(["Name"])\
    .orderBy(fifa["Age"].desc()).show(fifa.count())

+--------------------+
|                Name|
+--------------------+
|            O. Pérez|
|           T. Warner|
|       K. Pilkington|
|         S. Narazaki|
|           J. Villar|
|        H. Sulaimani|
|            B. Nivet|
|            M. Tyler|
|            C. Muñoz|
|         S. Phillips|
|           G. Buffon|
|         B. Castillo|
|              Hilton|
|            F. Kippe|
|         A. Bizzarri|
|             W. Díaz|
|          S. Bertoli|
|         Y. Nakazawa|
|         S. Nakamura|
|     P. van der Vlag|
|        C. Lucchetti|
|        A. Al Basisi|
|       S. Sorrentino|
|            R. Kawai|
|          P. Guiñazú|
|          M. Caranta|
|          C. Pizarro|
|           Zhou Ting|
|           Cifuentes|
|        M. Ogasawara|
|         D. Dainelli|
|  D. Konstantopoulos|
|       S. Pellissier|
|           D. Bulman|
|           T. Howard|
|           M. Gurski|
|        N. Fernández|
|          K. Ellison|
|          N. Rimando|
|           F. Cubero|
|          

In [103]:
(
    fifa
    .select(
        'Name', 
        'Age',
        'Club'
    )
    .orderBy(
        'age', ascending=False
    )
).show(5)

+-------------+---+------------------+
|         Name|Age|              Club|
+-------------+---+------------------+
|     O. Pérez| 45|           Pachuca|
|    T. Warner| 44|Accrington Stanley|
|K. Pilkington| 44|  Cambridge United|
|  S. Narazaki| 42|    Nagoya Grampus|
|    J. Villar| 41|              NULL|
+-------------+---+------------------+
only showing top 5 rows



Ascendente

In [107]:
(
    fifa
    .select('Name', 'Age')
    .orderBy(col('age').asc())
).show(5)

+--------------+---+
|          Name|Age|
+--------------+---+
|     B. Nygren| 16|
|   W. Geubbels| 16|
|  H. Andersson| 16|
|Y. Verschaeren| 16|
|      A. Doğan| 16|
+--------------+---+
only showing top 5 rows



In [35]:
# .desc()

fifa.select(["Name", "Age"])\
    .orderBy(fifa["Age"].desc()).show(5)

+-------------+---+
|         Name|Age|
+-------------+---+
|     O. Pérez| 45|
|K. Pilkington| 44|
|    T. Warner| 44|
|  S. Narazaki| 42|
|     C. Muñoz| 41|
+-------------+---+
only showing top 5 rows



In [36]:
(
    fifa
    .select('Name', 'Age')
    .orderBy(col('age').desc())
).show(5)

+-------------+---+
|         Name|Age|
+-------------+---+
|     O. Pérez| 45|
|K. Pilkington| 44|
|    T. Warner| 44|
|  S. Narazaki| 42|
|    J. Villar| 41|
+-------------+---+
only showing top 5 rows



In [113]:
# Para filtrar por palabras podemos usar .where en conjunto con .like

fifa.select(["Name", "Club", "Age"])\
    .where(fifa.Club.like("%Barcelona%"))\
    .orderBy(col('age').asc())\
    .show(n = fifa.count(), truncate = False)

+---------------+------------+---+
|Name           |Club        |Age|
+---------------+------------+---+
|Riqui Puig     |FC Barcelona|18 |
|Miranda        |FC Barcelona|18 |
|Abel Ruiz      |FC Barcelona|18 |
|Jorge Cuenca   |FC Barcelona|18 |
|M. Wagué       |FC Barcelona|19 |
|Chumi          |FC Barcelona|19 |
|Oriol Busquets |FC Barcelona|19 |
|Guillem Jaime  |FC Barcelona|19 |
|Iñaki Peña     |FC Barcelona|19 |
|Aleñá          |FC Barcelona|20 |
|Ezkieta        |FC Barcelona|21 |
|O. Dembélé     |FC Barcelona|21 |
|Arthur         |FC Barcelona|21 |
|Malcom         |FC Barcelona|21 |
|Munir          |FC Barcelona|22 |
|C. Lenglet     |FC Barcelona|23 |
|Sergi Samper   |FC Barcelona|23 |
|S. Umtiti      |FC Barcelona|24 |
|Nélson Semedo  |FC Barcelona|24 |
|Denis Suárez   |FC Barcelona|24 |
|Rafinha        |FC Barcelona|25 |
|M. ter Stegen  |FC Barcelona|26 |
|Coutinho       |FC Barcelona|26 |
|Sergi Roberto  |FC Barcelona|26 |
|Sergio Busquets|FC Barcelona|29 |
|Jordi Alba     |FC 

O con la función filter

In [120]:
fifa.select('Name','Club', "Age")\
    .filter(fifa["club"].like('%Barcelona%'))\
    .orderBy(fifa["age"].asc())\
    .show(5, truncate=False)

+------------+------------+---+
|Name        |Club        |Age|
+------------+------------+---+
|Miranda     |FC Barcelona|18 |
|Riqui Puig  |FC Barcelona|18 |
|Jorge Cuenca|FC Barcelona|18 |
|Abel Ruiz   |FC Barcelona|18 |
|Chumi       |FC Barcelona|19 |
+------------+------------+---+
only showing top 5 rows




Dentro de la función select en PySpark, puedes realizar varias operaciones en las columnas existentes y también crear nuevas columnas basadas en esas operaciones. Aquí hay algunas operaciones que puedes realizar dentro del select:

**Operaciones de Columnas:**

Aplicar funciones a una columna existente.
Realizar operaciones aritméticas o lógicas entre columnas.
Utilizar funciones de manipulación de cadenas **(substr, concat, lower, upper, etc.)**.

**Creación de Nuevas Columnas:**

Puedes crear nuevas columnas calculadas en función de las existentes.
Utilizar funciones matemáticas **(sqrt, log, exp, etc.)** para calcular valores nuevos.
Aplicar funciones de fecha y hora si tu DataFrame incluye columnas de tipo fecha o timestamp.

**Alias (Renombrar Columnas):**

Puedes utilizar alias para renombrar una columna en el resultado.

In [130]:
# Podemos utilizar .substr() para hacer "slicing" a una cadena de caracteres

fifa.select("Photo", fifa.Photo.substr(-3, 3).alias("slicing")).show(n = fifa.count(), truncate = False)

+----------------------------------------------+-------+
|Photo                                         |slicing|
+----------------------------------------------+-------+
|https://cdn.sofifa.org/players/4/19/158023.png|png    |
|https://cdn.sofifa.org/players/4/19/20801.png |png    |
|https://cdn.sofifa.org/players/4/19/190871.png|png    |
|https://cdn.sofifa.org/players/4/19/193080.png|png    |
|https://cdn.sofifa.org/players/4/19/192985.png|png    |
|https://cdn.sofifa.org/players/4/19/183277.png|png    |
|https://cdn.sofifa.org/players/4/19/177003.png|png    |
|https://cdn.sofifa.org/players/4/19/176580.png|png    |
|https://cdn.sofifa.org/players/4/19/155862.png|png    |
|https://cdn.sofifa.org/players/4/19/200389.png|png    |
|https://cdn.sofifa.org/players/4/19/188545.png|png    |
|https://cdn.sofifa.org/players/4/19/182521.png|png    |
|https://cdn.sofifa.org/players/4/19/182493.png|png    |
|https://cdn.sofifa.org/players/4/19/168542.png|png    |
|https://cdn.sofifa.org/players

In [137]:
fifa.select("Name", "Club", "Age", "Marking")\
    .filter(fifa.Club.isin("FC Barcelona", "Juventus"))\
    .orderBy(fifa.Marking.asc())\
    .show(n= fifa.count(), truncate = False)

+-----------------+------------+---+-------+
|Name             |Club        |Age|Marking|
+-----------------+------------+---+-------+
|Iñaki Peña       |FC Barcelona|19 |9      |
|Ezkieta          |FC Barcelona|21 |13     |
|J. Cillessen     |FC Barcelona|29 |18     |
|W. Szczęsny      |Juventus    |28 |20     |
|M. Perin         |Juventus    |25 |20     |
|C. Pinsoglio     |Juventus    |28 |20     |
|Malcom           |FC Barcelona|21 |21     |
|P. Dybala        |Juventus    |24 |23     |
|M. ter Stegen    |FC Barcelona|26 |25     |
|M. Kean          |Juventus    |18 |27     |
|Cristiano Ronaldo|Juventus    |33 |28     |
|L. Messi         |FC Barcelona|31 |33     |
|Munir            |FC Barcelona|22 |38     |
|O. Dembélé       |FC Barcelona|21 |42     |
|Douglas Costa    |Juventus    |27 |45     |
|Abel Ruiz        |FC Barcelona|18 |45     |
|Guillem Jaime    |FC Barcelona|19 |52     |
|Riqui Puig       |FC Barcelona|18 |53     |
|Coutinho         |FC Barcelona|26 |55     |
|M. Mandžu

In [142]:
# isin para buscar filas que tengan un contenido especifico en la columna dada
fifa.select(fifa.columns)\
    .filter(fifa.Club.isin("FC Barcelon", "Juventus"))\
    .orderBy(fifa["Marking"].asc())\
    .show(n = fifa.count(), truncate=False)

+-----+------+-----------------+---+----------------------------------------------+------------------+------------------------------------+-------+---------+--------+-------------------------------------------+------+-----+-------+--------------+------------------------+---------+-----------+--------------+----------+---------+--------+-------------+------------+-----------+--------------------+------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------+---------+---------------+------------+-------+---------+-----+----------+-----------+-----------+------------+-----------+-------+---------+-------+---------+-------+-------+--------+---------+----------+-------------+-----------+------+---------+---------+-------+--------------+-------------+--------+----------+---------+-------------+----------+--------------+
|_c0  |ID    |Name             |Age|Photo                                         |

In [139]:
fifa.select("*")\
    .filter(fifa.Club.isin("FC Barcelona", "Juventus"))\
    .orderBy(fifa.Marking.asc())\
    .show()

+-----+------+-----------------+---+--------------------+-----------+--------------------+-------+---------+------------+--------------------+-------+-----+-------+--------------+------------------------+---------+-----------+--------------+----------+---------+--------+-------------+------------+-----------+--------------------+------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------+---------+---------------+------------+-------+---------+-----+----------+-----------+-----------+------------+-----------+-------+---------+-------+---------+-------+-------+--------+---------+----------+-------------+-----------+------+---------+---------+-------+--------------+-------------+--------+----------+---------+-------------+----------+--------------+
|  _c0|    ID|             Name|Age|               Photo|Nationality|                Flag|Overall|Potential|        Club|           Club Logo|  Value|

In [131]:
# .isin similar a Pandas

fifa[fifa.Club.isin("FC Barcelona", "Juventus")].show(n= fifa.count(), truncate = False)

+-----+------+-----------------+---+----------------------------------------------+------------------+------------------------------------+-------+---------+------------+--------------------------------------------+-------+-----+-------+--------------+------------------------+---------+-----------+--------------+----------+---------+--------+-------------+------------+-----------+--------------------+------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------+---------+---------------+------------+-------+---------+-----+----------+-----------+-----------+------------+-----------+-------+---------+-------+---------+-------+-------+--------+---------+----------+-------------+-----------+------+---------+---------+-------+--------------+-------------+--------+----------+---------+-------------+----------+--------------+
|_c0  |ID    |Name             |Age|Photo                                    

In [48]:
fifa[fifa.Club.isin("FC Barcelon", "Juventus")].limit(5).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M
1,15,211110,P. Dybala,24,https://cdn.sofifa.org/players/4/19/211110.png,Argentina,https://cdn.sofifa.org/flags/52.png,89,94,Juventus,...,84,23,20,20,5,4,4,5,8,€153.5M
2,24,138956,G. Chiellini,33,https://cdn.sofifa.org/players/4/19/138956.png,Italy,https://cdn.sofifa.org/flags/27.png,89,89,Juventus,...,84,93,93,90,3,3,2,4,3,€44.6M
3,64,191043,Alex Sandro,27,https://cdn.sofifa.org/players/4/19/191043.png,Brazil,https://cdn.sofifa.org/flags/54.png,86,86,Juventus,...,82,81,84,84,7,7,9,12,5,€60.2M
4,65,190483,Douglas Costa,27,https://cdn.sofifa.org/players/4/19/190483.png,Brazil,https://cdn.sofifa.org/flags/54.png,86,86,Juventus,...,84,45,38,34,13,15,9,12,5,€76.7M


EL WHERE Y EL FILTER SON LO MISMO

In [147]:
fifa.select("Name", "Club")                \
    .where((fifa.Name.startswith("L")) & (fifa.Name.endswith("i")))\
    .show(fifa.count())

+----------------+--------------------+
|            Name|                Club|
+----------------+--------------------+
|        L. Messi|        FC Barcelona|
|      L. Bonucci|            Juventus|
|    L. Fabiański|     West Ham United|
|   L. Pellegrini|                Roma|
|    L. Pavoletti|            Cagliari|
|     L. Podolski|         Vissel Kobe|
|      L. Tonelli|           Sampdoria|
|   L. Rossettini|       Chievo Verona|
|        L. Zuffi|       FC Basel 1893|
|    L. Antonelli|              Empoli|
|    L. Skorupski|             Bologna|
|     L. Vangioni|           Monterrey|
| L. De Silvestri|              Torino|
|     L. Cigarini|            Cagliari|
|       L. Rigoni|               Parma|
|    L. Cavallini|           Puebla FC|
|    Léo Bonatini|Wolverhampton Wan...|
|   L. Mazzitelli|               Genoa|
|   L. Pisculichi|  Argentinos Juniors|
|       L. Sigali|         Racing Club|
|      L. Menossi| Club Atlético Tigre|
|        L. Phiri|En Avant de Guingamp|


In [146]:
# .where(), .startswith() y .endswith()
# Nota: los .where van uno detrás de otro.

# fifa.select("Name", "Club").where(fifa.Name.startswith("L")).where(fifa.Name.endswith("i")).show(5)

fifa.select("Name", "Club")                \
    .filter((fifa.Name.startswith("L")) & (fifa.Name.endswith("i")))\
    .show(fifa.count())

+----------------+--------------------+
|            Name|                Club|
+----------------+--------------------+
|        L. Messi|        FC Barcelona|
|      L. Bonucci|            Juventus|
|    L. Fabiański|     West Ham United|
|   L. Pellegrini|                Roma|
|    L. Pavoletti|            Cagliari|
|     L. Podolski|         Vissel Kobe|
|      L. Tonelli|           Sampdoria|
|   L. Rossettini|       Chievo Verona|
|        L. Zuffi|       FC Basel 1893|
|    L. Antonelli|              Empoli|
|    L. Skorupski|             Bologna|
|     L. Vangioni|           Monterrey|
| L. De Silvestri|              Torino|
|     L. Cigarini|            Cagliari|
|       L. Rigoni|               Parma|
|    L. Cavallini|           Puebla FC|
|    Léo Bonatini|Wolverhampton Wan...|
|   L. Mazzitelli|               Genoa|
|   L. Pisculichi|  Argentinos Juniors|
|       L. Sigali|         Racing Club|
|      L. Menossi| Club Atlético Tigre|
|        L. Phiri|En Avant de Guingamp|


In [167]:
type(fifa)

pyspark.sql.dataframe.DataFrame

In [168]:
from funciones import shape

In [161]:
shape(fifa)

(18207, 89)

In [158]:
fifa.limit(100).count()

100

In [159]:
fifa.count()

18207

In [170]:
# .limit() para seleccionar el número de filas

fragmento = fifa.limit(100)
fragmento.count()

100

In [171]:
# Nos quedamos con las primeras 5 columnas

col_list = fifa.columns[:5]
fragmento = fifa.select(col_list)

In [172]:
fragmento

DataFrame[_c0: int, ID: int, Name: string, Age: int, Photo: string]

In [173]:
# nuevo df
fragmento.show(5, False)

+---+------+-----------------+---+----------------------------------------------+
|_c0|ID    |Name             |Age|Photo                                         |
+---+------+-----------------+---+----------------------------------------------+
|0  |158023|L. Messi         |31 |https://cdn.sofifa.org/players/4/19/158023.png|
|1  |20801 |Cristiano Ronaldo|33 |https://cdn.sofifa.org/players/4/19/20801.png |
|2  |190871|Neymar Jr        |26 |https://cdn.sofifa.org/players/4/19/190871.png|
|3  |193080|De Gea           |27 |https://cdn.sofifa.org/players/4/19/193080.png|
|4  |192985|K. De Bruyne     |27 |https://cdn.sofifa.org/players/4/19/192985.png|
+---+------+-----------------+---+----------------------------------------------+
only showing top 5 rows



In [175]:
# .filter(condicion) == .where(condicion)

fifa.filter("Overall > 50").limit(5).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M
1,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M
2,2,190871,Neymar Jr,26,https://cdn.sofifa.org/players/4/19/190871.png,Brazil,https://cdn.sofifa.org/flags/54.png,92,93,Paris Saint-Germain,...,94,27,24,33,9,9,15,15,11,€228.1M
3,3,193080,De Gea,27,https://cdn.sofifa.org/players/4/19/193080.png,Spain,https://cdn.sofifa.org/flags/45.png,91,93,Manchester United,...,68,15,21,13,90,85,87,88,94,€138.6M
4,4,192985,K. De Bruyne,27,https://cdn.sofifa.org/players/4/19/192985.png,Belgium,https://cdn.sofifa.org/flags/7.png,91,92,Manchester City,...,88,68,58,51,15,13,5,10,13,€196.4M


In [183]:
fifa.filter(fifa['Age'].between(19, 30))\
    .select("age", "name").toPandas()

Unnamed: 0,age,name
0,26,Neymar Jr
1,27,De Gea
2,27,K. De Bruyne
3,27,E. Hazard
4,25,J. Oblak
...,...,...
14557,20,Zhang Yufeng
14558,19,C. Ehlich
14559,19,K. Fujikawa
14560,19,J. Lundstram


In [180]:
fifa.filter((fifa['Overall'] > 19) & (fifa["Overall"] < 50)).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,18118,243725,D. Collins,17,https://cdn.sofifa.org/players/4/19/243725.png,Republic of Ireland,https://cdn.sofifa.org/flags/25.png,49,62,Sligo Rovers,...,50,39,29,27,6,9,5,13,8,€109K
1,18119,240668,J. Egan,19,https://cdn.sofifa.org/players/4/19/240668.png,England,https://cdn.sofifa.org/flags/14.png,49,62,Carlisle United,...,42,23,25,27,12,9,10,8,12,€143K
2,18120,241443,Xie Xiaofan,20,https://cdn.sofifa.org/players/4/19/241443.png,China PR,https://cdn.sofifa.org/flags/155.png,49,61,Jiangsu Suning FC,...,42,53,40,42,13,12,7,12,12,€118K
3,18121,246051,B. Buckley,17,https://cdn.sofifa.org/players/4/19/246051.png,England,https://cdn.sofifa.org/flags/14.png,49,61,Grimsby Town,...,49,30,40,40,10,8,14,11,11,€98K
4,18122,245542,G. Figliuzzi,17,https://cdn.sofifa.org/players/4/19/245542.png,Italy,https://cdn.sofifa.org/flags/27.png,49,69,Crotone,...,36,6,11,11,49,51,44,45,50,€130K
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
84,18202,238813,J. Lundstram,19,https://cdn.sofifa.org/players/4/19/238813.png,England,https://cdn.sofifa.org/flags/14.png,47,65,Crewe Alexandra,...,45,40,48,47,10,13,7,8,9,€143K
85,18203,243165,N. Christoffersson,19,https://cdn.sofifa.org/players/4/19/243165.png,Sweden,https://cdn.sofifa.org/flags/46.png,47,63,Trelleborgs FF,...,42,22,15,19,10,9,9,5,12,€113K
86,18204,241638,B. Worman,16,https://cdn.sofifa.org/players/4/19/241638.png,England,https://cdn.sofifa.org/flags/14.png,47,67,Cambridge United,...,41,32,13,11,6,5,10,6,13,€165K
87,18205,246268,D. Walker-Rice,17,https://cdn.sofifa.org/players/4/19/246268.png,England,https://cdn.sofifa.org/flags/14.png,47,66,Tranmere Rovers,...,46,20,25,27,14,6,14,8,9,€143K


In [59]:
fifa.filter(col('Overall') > 50).limit(5).toPandas()

Unnamed: 0,_c0,ID,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,...,Composure,Marking,StandingTackle,SlidingTackle,GKDiving,GKHandling,GKKicking,GKPositioning,GKReflexes,Release Clause
0,0,158023,L. Messi,31,https://cdn.sofifa.org/players/4/19/158023.png,Argentina,https://cdn.sofifa.org/flags/52.png,94,94,FC Barcelona,...,96,33,28,26,6,11,15,14,8,€226.5M
1,1,20801,Cristiano Ronaldo,33,https://cdn.sofifa.org/players/4/19/20801.png,Portugal,https://cdn.sofifa.org/flags/38.png,94,94,Juventus,...,95,28,31,23,7,11,15,14,11,€127.1M
2,2,190871,Neymar Jr,26,https://cdn.sofifa.org/players/4/19/190871.png,Brazil,https://cdn.sofifa.org/flags/54.png,92,93,Paris Saint-Germain,...,94,27,24,33,9,9,15,15,11,€228.1M
3,3,193080,De Gea,27,https://cdn.sofifa.org/players/4/19/193080.png,Spain,https://cdn.sofifa.org/flags/45.png,91,93,Manchester United,...,68,15,21,13,90,85,87,88,94,€138.6M
4,4,192985,K. De Bruyne,27,https://cdn.sofifa.org/players/4/19/192985.png,Belgium,https://cdn.sofifa.org/flags/7.png,91,92,Manchester City,...,88,68,58,51,15,13,5,10,13,€196.4M


### ORDEN QUERYS

    · SELECT
    · FILTER
    · ORDERBY
    · SHOW

In [62]:
%%time
# Podemos usar .filter en conjunto con .select

fifa.filter("Overall > 50").select(["Name", "Age"]).toPandas()

CPU times: total: 141 ms
Wall time: 457 ms


Unnamed: 0,Name,Age
0,L. Messi,31
1,Cristiano Ronaldo,33
2,Neymar Jr,26
3,De Gea,27
4,K. De Bruyne,27
...,...,...
18010,Zhu Zhengyu,23
18011,J. Ellis,17
18012,B. Galach,17
18013,W. Møller,20


In [63]:
%%time
# El orden no afecta el output .select .filter

fifa.select(["Name", "Age"]).filter("Overall > 50").toPandas()

CPU times: total: 15.6 ms
Wall time: 187 ms


Unnamed: 0,Name,Age
0,L. Messi,31
1,Cristiano Ronaldo,33
2,Neymar Jr,26
3,De Gea,27
4,K. De Bruyne,27
...,...,...
18010,Zhu Zhengyu,23
18011,J. Ellis,17
18012,B. Galach,17
18013,W. Møller,20


In [64]:
# Varias condiciones AND & OR

fifa.select(["Name", "Age", "Club"]).filter("Overall > 50 AND Age < 30 AND Club = 'FC Barcelona'").limit(5).toPandas()

Unnamed: 0,Name,Age,Club
0,M. ter Stegen,26,FC Barcelona
1,Sergio Busquets,29,FC Barcelona
2,Coutinho,26,FC Barcelona
3,S. Umtiti,24,FC Barcelona
4,Jordi Alba,29,FC Barcelona


In [184]:
fifa.select(["Name", "Age", "Club"])\
    .filter(
            (col('Overall') > 50) & 
            (col('Age') < 30) & 
            (col('Club') == 'FC Barcelona')
            )\
    .limit(5).toPandas()

Unnamed: 0,Name,Age,Club
0,M. ter Stegen,26,FC Barcelona
1,Sergio Busquets,29,FC Barcelona
2,Coutinho,26,FC Barcelona
3,S. Umtiti,24,FC Barcelona
4,Jordi Alba,29,FC Barcelona


In [187]:
fifa.select(["Name", "Age", "Club"])\
    .filter("Club = 'Juventus' OR Club = 'FC Barcelona'")\
    .limit(5).toPandas()

Unnamed: 0,Name,Age,Club
0,L. Messi,31,FC Barcelona
1,Cristiano Ronaldo,33,Juventus
2,L. Suárez,31,FC Barcelona
3,P. Dybala,24,Juventus
4,M. ter Stegen,26,FC Barcelona


In [186]:
fifa.select('Name','Age','Club')\
    .filter(
        (col('Club')=='Juventus')
        | 
        (col('Club')=='FC Barcelona')
    )\
    .limit(5).toPandas()

Unnamed: 0,Name,Age,Club
0,L. Messi,31,FC Barcelona
1,Cristiano Ronaldo,33,Juventus
2,L. Suárez,31,FC Barcelona
3,P. Dybala,24,Juventus
4,M. ter Stegen,26,FC Barcelona


In [205]:
# .collect() "transforma" el output a list

result = fifa.select(["Nationality", "Name", "Age", "Overall"])\
             .filter("Overall > 50")\
             .orderBy(fifa["Overall"].desc())\
             .collect()

result

[Row(Nationality='Argentina', Name='L. Messi', Age=31, Overall=94),
 Row(Nationality='Portugal', Name='Cristiano Ronaldo', Age=33, Overall=94),
 Row(Nationality='Brazil', Name='Neymar Jr', Age=26, Overall=92),
 Row(Nationality='Spain', Name='De Gea', Age=27, Overall=91),
 Row(Nationality='Belgium', Name='K. De Bruyne', Age=27, Overall=91),
 Row(Nationality='Belgium', Name='E. Hazard', Age=27, Overall=91),
 Row(Nationality='Croatia', Name='L. Modrić', Age=32, Overall=91),
 Row(Nationality='Uruguay', Name='L. Suárez', Age=31, Overall=91),
 Row(Nationality='Spain', Name='Sergio Ramos', Age=32, Overall=91),
 Row(Nationality='Slovenia', Name='J. Oblak', Age=25, Overall=90),
 Row(Nationality='Poland', Name='R. Lewandowski', Age=29, Overall=90),
 Row(Nationality='Germany', Name='T. Kroos', Age=28, Overall=90),
 Row(Nationality='Uruguay', Name='D. Godín', Age=32, Overall=90),
 Row(Nationality='Spain', Name='David Silva', Age=32, Overall=90),
 Row(Nationality='France', Name='N. Kanté', Age=27, 

In [192]:
result_values_only = [row[:] for row in result]

In [193]:
result_values_only

[('Argentina', 'L. Messi', 31, 94),
 ('Portugal', 'Cristiano Ronaldo', 33, 94),
 ('Brazil', 'Neymar Jr', 26, 92),
 ('Spain', 'De Gea', 27, 91),
 ('Belgium', 'K. De Bruyne', 27, 91)]

In [198]:
result[0][1]

'L. Messi'

In [200]:
# result
print("Mejor jugador Overall > 50:", result[0][1])

Mejor jugador Overall > 50: L. Messi


In [207]:
# result
print("Peor jugador Overall < 50:", result[-1][1])

Peor jugador Overall < 50: C. Addai


### Manipulacion de DataFrames

In [227]:
# concat_ws()

fragmento = fifa.select(
            concat_ws(" --> ", fifa.Name, fifa.Nationality).alias("Nombre/Nacionalidad"), fifa.Age
            )

fragmento.show(truncate = False)

+------------------------------+---+
|Nombre/Nacionalidad           |Age|
+------------------------------+---+
|L. Messi --> Argentina        |31 |
|Cristiano Ronaldo --> Portugal|33 |
|Neymar Jr --> Brazil          |26 |
|De Gea --> Spain              |27 |
|K. De Bruyne --> Belgium      |27 |
|E. Hazard --> Belgium         |27 |
|L. Modrić --> Croatia         |32 |
|L. Suárez --> Uruguay         |31 |
|Sergio Ramos --> Spain        |32 |
|J. Oblak --> Slovenia         |25 |
|R. Lewandowski --> Poland     |29 |
|T. Kroos --> Germany          |28 |
|D. Godín --> Uruguay          |32 |
|David Silva --> Spain         |32 |
|N. Kanté --> France           |27 |
|P. Dybala --> Argentina       |24 |
|H. Kane --> England           |24 |
|A. Griezmann --> France       |27 |
|M. ter Stegen --> Germany     |26 |
|T. Courtois --> Belgium       |26 |
+------------------------------+---+
only showing top 20 rows



In [223]:
concat = fifa.select('Name','Nationality','Club','Age')\
             .withColumn('Caracterísitcas', 
                         concat_ws(' // ', col('Name'),col('Nationality'),col('Age'),col('Club'))
                        )

concat.show(truncate=False)

+-----------------+-----------+-------------------+---+---------------------------------------------------+
|Name             |Nationality|Club               |Age|Caracterísitcas                                    |
+-----------------+-----------+-------------------+---+---------------------------------------------------+
|L. Messi         |Argentina  |FC Barcelona       |31 |L. Messi // Argentina // 31 // FC Barcelona        |
|Cristiano Ronaldo|Portugal   |Juventus           |33 |Cristiano Ronaldo // Portugal // 33 // Juventus    |
|Neymar Jr        |Brazil     |Paris Saint-Germain|26 |Neymar Jr // Brazil // 26 // Paris Saint-Germain   |
|De Gea           |Spain      |Manchester United  |27 |De Gea // Spain // 27 // Manchester United         |
|K. De Bruyne     |Belgium    |Manchester City    |27 |K. De Bruyne // Belgium // 27 // Manchester City   |
|E. Hazard        |Belgium    |Chelsea            |27 |E. Hazard // Belgium // 27 // Chelsea              |
|L. Modrić        |Croatia  

In [226]:
concat.rdd.id()

799

In [228]:
fragmento.rdd.id()

809

In [229]:
# Nuevo df

videos = spark.read.csv(path = "../02.data/youtubevideos.csv",
                        header = True, inferSchema = True)

videos.limit(3).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...


In [93]:
videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [233]:
# Podemos reasignar las columnas usando .withColumn en conjunto con .cast, to_date o to_timestamp

df = videos.withColumn("views"        , videos["views"].cast(IntegerType()))                        \
           .withColumn("likes"        , videos["likes"].cast(IntegerType()))                        \
           .withColumn("dislikes"     , videos["dislikes"].cast(IntegerType()))                     \
           .withColumn("category_id"  , videos["category_id"].cast(IntegerType()))                  \
           .withColumn("fecha", to_date(videos["trending_date"], "yy.mm.dd")) 

In [234]:
df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)
 |-- fecha: date (nullable = true)



In [235]:
df.limit(3).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description,fecha
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...,2017-01-11
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John...",2017-01-11
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...,2017-01-11


In [238]:
df.count()

48137

In [241]:
# .withColumn() también nos permite crear columnas a partir de otras

df = df.withColumn("publish_time_2", regexp_replace(df.publish_time, "T", " "))
df = df.withColumn("publish_time_2", regexp_replace(df.publish_time_2, "Z", ""))

df.select("*").show(n = 5, truncate = False)

+-----------+-------------+--------------------------------------------------------------+---------------------+-----------+------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+------+--------+-------------+----------------------------------------------+-----------------+----------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [98]:
# lower()
df.select("title", lower(df.title)).show(5, False)

+--------------------------------------------------------------+--------------------------------------------------------------+
|title                                                         |lower(title)                                                  |
+--------------------------------------------------------------+--------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |we want to talk about our marriage                            |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|the trump presidency: last week tonight with john oliver (hbo)|
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |racist superman | rudy mancuso, king bach & lele pons         |
|Nickelback Lyrics: Real or Fake?                              |nickelback lyrics: real or fake?                              |
|I Dare You: GOING BALD!?                                      |i dare you: going bald!?                

In [242]:
df = df.withColumn("lower(title)", lower(df.title))

In [244]:
df.limit(5).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description,fecha,publish_time_2,lower(title)
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...,2017-01-11,2017-11-13 17:13:01.000,we want to talk about our marriage
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John...",2017-01-11,2017-11-13 07:30:00.000,the trump presidency: last week tonight with j...
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...,2017-01-11,2017-11-12 19:05:24.000,"racist superman | rudy mancuso, king bach & le..."
3,puqaWrEC7tY,17.14.11,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...,2017-01-11,2017-11-13 11:00:04.000,nickelback lyrics: real or fake?
4,d380meD0W0M,17.14.11,I Dare You: GOING BALD!?,nigahiga,24,2017-11-12T18:01:41.000Z,"""ryan""|""higa""|""higatv""|""nigahiga""|""i dare you""...",2095731,132235,1989,17518,https://i.ytimg.com/vi/d380meD0W0M/default.jpg,False,False,False,I know it's been a while since we did this sho...,2017-01-11,2017-11-12 18:01:41.000,i dare you: going bald!?


In [246]:
# when(), puede crear columnas a partir de otras si se cumple cierta condición

df.select("likes", "dislikes",
         (
          when(df.likes > df.dislikes, "Good")\
         .when(df.likes < df.dislikes, "Bad")\
         .when(df.likes == df.dislikes, "Equal")\
         .otherwise("Undetermined")).alias("Favorability")
         )\
    .show(5)

# otherwise() se usa cuando no se resuelve la condicion, y esto puede suceder, por ejemplo, cuando hay NaN's

+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
| 10172|     666|        Good|
|132235|    1989|        Good|
+------+--------+------------+
only showing top 5 rows



In [251]:
df.select("likes", "dislikes",
         (
          when(df.likes > df.dislikes, "Good")\
         .when(df.likes < df.dislikes, "Bad")\
         .when(df.likes == df.dislikes, "Equal")\
         .otherwise("Undetermined")).alias("Favorability")
         )\
    .show(5)

+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
| 10172|     666|        Good|
|132235|    1989|        Good|
+------+--------+------------+
only showing top 5 rows



In [101]:
# expr

# con expr podemos escribir en sintaxis SQL como queremos la nueva columna

(
    df
    .select("likes",
          "dislikes",
          expr("CASE WHEN likes > dislikes THEN 'Good' \
                     WHEN dislikes > likes THEN 'Bad'  \
                     WHEN likes = dislikes THEN 'Equal'\
                     ELSE 'Undetermined' END           \
                AS Favorability")
         )
    .groupBy('Favorability')
    .count()
).show(5)

+------------+-----+
|Favorability|count|
+------------+-----+
|       Equal|  181|
|        Good|40192|
|         Bad|  576|
|Undetermined| 7188|
+------------+-----+



In [106]:
# year() , month() & dayofmonth
# Esto funciona porque la columna esta en formato DateType()

df.select("trending_date",
          year("trending_date").alias("year"),
          month("trending_date").alias("month"),
          dayofmonth("trending_date").alias("day"),
          dayofweek("trending_date").alias("day_of_week"),
          dayofyear("trending_date").alias("day_of_year")
         ).show(5)

+-------------+----+-----+---+-----------+-----------+
|trending_date|year|month|day|day_of_week|day_of_year|
+-------------+----+-----+---+-----------+-----------+
|   2017-01-14|2017|    1| 14|          7|         14|
|   2017-01-14|2017|    1| 14|          7|         14|
|   2017-01-14|2017|    1| 14|          7|         14|
|   2017-01-14|2017|    1| 14|          7|         14|
|   2017-01-14|2017|    1| 14|          7|         14|
+-------------+----+-----+---+-----------+-----------+
only showing top 5 rows



In [107]:
# datediff()
# Esto funciona porque las columnas estan en formato DateType()

df.select("trending_date",
          "publish_time_2",
          datediff(df.publish_time_2, df.trending_date)).show(10, False)

+-------------+-----------------------+---------------------------------------+
|trending_date|publish_time_2         |datediff(publish_time_2, trending_date)|
+-------------+-----------------------+---------------------------------------+
|2017-01-14   |2017-11-13 17:13:01.000|303                                    |
|2017-01-14   |2017-11-13 07:30:00.000|303                                    |
|2017-01-14   |2017-11-12 19:05:24.000|302                                    |
|2017-01-14   |2017-11-13 11:00:04.000|303                                    |
|2017-01-14   |2017-11-12 18:01:41.000|302                                    |
|2017-01-14   |2017-11-13 19:07:23.000|303                                    |
|2017-01-14   |2017-11-12 05:37:17.000|302                                    |
|2017-01-14   |2017-11-12 21:50:37.000|302                                    |
|2017-01-14   |2017-11-13 14:00:23.000|303                                    |
|2017-01-14   |2017-11-13 13:45:16.000|3

In [108]:
# split()
array = df.select("title",
                  split(df.title, " ").alias("split"))

array.show(5, False)

+--------------------------------------------------------------+-------------------------------------------------------------------------+
|title                                                         |split                                                                    |
+--------------------------------------------------------------+-------------------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |[WE, WANT, TO, TALK, ABOUT, OUR, MARRIAGE]                               |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|[The, Trump, Presidency:, Last, Week, Tonight, with, John, Oliver, (HBO)]|
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |[Racist, Superman, |, Rudy, Mancuso,, King, Bach, &, Lele, Pons]         |
|Nickelback Lyrics: Real or Fake?                              |[Nickelback, Lyrics:, Real, or, Fake?]                                   |
|I Dare You: GOING BALD!?  

In [109]:
# array_contains parecido a "in" en python

array.select("split",
             array_contains(array.split, "(HBO)")).show(5, False)

+-------------------------------------------------------------------------+----------------------------+
|split                                                                    |array_contains(split, (HBO))|
+-------------------------------------------------------------------------+----------------------------+
|[WE, WANT, TO, TALK, ABOUT, OUR, MARRIAGE]                               |false                       |
|[The, Trump, Presidency:, Last, Week, Tonight, with, John, Oliver, (HBO)]|true                        |
|[Racist, Superman, |, Rudy, Mancuso,, King, Bach, &, Lele, Pons]         |false                       |
|[Nickelback, Lyrics:, Real, or, Fake?]                                   |false                       |
|[I, Dare, You:, GOING, BALD!?]                                           |false                       |
+-------------------------------------------------------------------------+----------------------------+
only showing top 5 rows



In [114]:
# vamos a chequear las filas que contienen la condición "(HBO)" en su lista split y la llamamos checks,
# seguidamente selecionamos las filas de la columna split que son distintas con el método distinct(),
# y para finalizar filtramos por las columnas que cumplen la condición inicial

(
    array
    .select(
        "split",
        array_contains(
            array.split, 
            "(HBO)"
        ).alias('checks')
    )
    .select('split').distinct()
    .filter(col('checks')==True)
    .show(5, False)
)

+-------------------------------------------------------------------------------------+
|split                                                                                |
+-------------------------------------------------------------------------------------+
|[The, Trump, Presidency:, Last, Week, Tonight, with, John, Oliver, (HBO)]            |
|[What, It's, Like, To, Be, Absolutely, Obsessed, With, Bitcoin, (HBO)]               |
|[Watch, Silicon, Valley, Nerds, Face, Off, A, Capella, (HBO)]                        |
|[Last, Week, Tonight:, Season, 5, Official, Trailer, (HBO)]                          |
|[This, Hidden, 300, Foot, Stretch, Of, The, Berlin, Wall, Is, Still, Standing, (HBO)]|
+-------------------------------------------------------------------------------------+
only showing top 5 rows



In [115]:
# array_distinct parecido a .unique() en Pandas

array.select("title", array_distinct(array.split)).show(10, False)

+-----------------------------------------------------------------+---------------------------------------------------------------------------+
|title                                                            |array_distinct(split)                                                      |
+-----------------------------------------------------------------+---------------------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                               |[WE, WANT, TO, TALK, ABOUT, OUR, MARRIAGE]                                 |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)   |[The, Trump, Presidency:, Last, Week, Tonight, with, John, Oliver, (HBO)]  |
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons            |[Racist, Superman, |, Rudy, Mancuso,, King, Bach, &, Lele, Pons]           |
|Nickelback Lyrics: Real or Fake?                                 |[Nickelback, Lyrics:, Real, or, Fake?]                               

In [116]:
# array_remove eliminar un elemento de un array 

array.select("title", array_remove(array.split, "Presidency:")).show(5, False)

+--------------------------------------------------------------+----------------------------------------------------------------+
|title                                                         |array_remove(split, Presidency:)                                |
+--------------------------------------------------------------+----------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |[WE, WANT, TO, TALK, ABOUT, OUR, MARRIAGE]                      |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|[The, Trump, Last, Week, Tonight, with, John, Oliver, (HBO)]    |
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |[Racist, Superman, |, Rudy, Mancuso,, King, Bach, &, Lele, Pons]|
|Nickelback Lyrics: Real or Fake?                              |[Nickelback, Lyrics:, Real, or, Fake?]                          |
|I Dare You: GOING BALD!?                                      |[I, Dare, You:, GOING, BAL

### UDF

In [117]:
# Podemos usar funciones para crear nuevas columnas

from pyspark.sql.functions import udf          # user define functions
from pyspark.sql.types import IntegerType

In [121]:
# El retorno de lambda 

def square(x):
    return int(x**2)

square_udf = udf(f          = lambda x : square(x),
                 returnType = IntegerType()
                )

df.select("dislikes",
          square_udf("dislikes").alias("dislikes**2")).where(col("dislikes").isNotNull()).toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 40949 entries, 0 to 40948
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype
---  ------       --------------  -----
 0   dislikes     40949 non-null  int32
 1   dislikes**2  40949 non-null  int32
dtypes: int32(2)
memory usage: 320.0 KB


In [122]:
(
    df
    .filter(col('dislikes').isNotNull()) #filtro los valores nulos
    .withColumn('dislikes**2', square_udf('dislikes')) # genero una columna nueva con mi udf
    .select('dislikes', 'dislikes**2')
).show(5)

+--------+-----------+
|dislikes|dislikes**2|
+--------+-----------+
|    2966|    8797156|
|    6146|   37773316|
|    5339|   28504921|
|     666|     443556|
|    1989|    3956121|
+--------+-----------+
only showing top 5 rows



In [None]:
# Si ejecutamos sin usar .isNotNull() nos dará error porque hay NaN's
# df.select("dislikes", square_udf("dislikes")).show(5)

### Aggregate Functions

In [123]:
# igual que la funcion .groupBy() y .agg() de pandas

fifa.groupBy("Club", "Nationality").agg({"ID" : "count"}).show(1_000, truncate = False)

+-----------------------------------+--------------------+---------+
|Club                               |Nationality         |count(ID)|
+-----------------------------------+--------------------+---------+
|Juventus                           |Argentina           |1        |
|Manchester United                  |England             |11       |
|Sevilla FC                         |Denmark             |1        |
|Watford                            |Argentina           |1        |
|Burnley                            |Wales               |1        |
|Atiker Konyaspor                   |Turkey              |11       |
|Beşiktaş JK                        |Canada              |1        |
|Vitesse                            |South Africa        |1        |
|Santos Laguna                      |Uruguay             |3        |
|New York Red Bulls                 |United States       |14       |
|Rayo Vallecano                     |Portugal            |1        |
|Molde FK                         

In [124]:
(
    fifa
    .groupBy(
        "Club", 
        "Nationality"
    )
    .agg(
        {
            "ID" : "count",
            "Age": "mean",
            "Age": "max",
            "Age":'min',
            "Overall": 'mean'
        }
    )
).show(10, truncate = False)

+------------------+-------------+-----------------+---------+--------+
|Club              |Nationality  |avg(Overall)     |count(ID)|min(Age)|
+------------------+-------------+-----------------+---------+--------+
|Juventus          |Argentina    |89.0             |1        |24      |
|Manchester United |England      |74.0909090909091 |11       |17      |
|Sevilla FC        |Denmark      |81.0             |1        |29      |
|Watford           |Argentina    |80.0             |1        |27      |
|Burnley           |Wales        |76.0             |1        |28      |
|Atiker Konyaspor  |Turkey       |69.36363636363636|11       |21      |
|Beşiktaş JK       |Canada       |75.0             |1        |23      |
|Vitesse           |South Africa |75.0             |1        |28      |
|Santos Laguna     |Uruguay      |72.66666666666667|3        |24      |
|New York Red Bulls|United States|65.57142857142857|14       |18      |
+------------------+-------------+-----------------+---------+--

In [129]:
(
    fifa
    .groupBy(
        "Club", 
        "Nationality"
    )
    .agg(
        mean('Age').alias('Mean_Age'),
        min('Age').alias('Min_Age'),
        max('Age').alias('Max_Age'),
        mean('Overall').alias('Mean_Overall'),
        count('ID').alias('Count')
    )
).show(10, truncate = False)

+------------------+-------------+------------------+-------+-------+-----------------+-----+
|Club              |Nationality  |Mean_Age          |Min_Age|Max_Age|Mean_Overall     |Count|
+------------------+-------------+------------------+-------+-------+-----------------+-----+
|Juventus          |Argentina    |24.0              |24     |24     |89.0             |1    |
|Manchester United |England      |23.818181818181817|17     |35     |74.0909090909091 |11   |
|Sevilla FC        |Denmark      |29.0              |29     |29     |81.0             |1    |
|Watford           |Argentina    |27.0              |27     |27     |80.0             |1    |
|Burnley           |Wales        |28.0              |28     |28     |76.0             |1    |
|Atiker Konyaspor  |Turkey       |27.727272727272727|21     |34     |69.36363636363636|11   |
|Beşiktaş JK       |Canada       |23.0              |23     |23     |75.0             |1    |
|Vitesse           |South Africa |28.0              |28     

In [131]:
import pandas as pd

df_fifa = pd.read_csv(filepath_or_buffer = "../data/fifa19.csv")

df_fifa.groupby(["Club", "Nationality"]).agg({"ID" : "count"})

Unnamed: 0_level_0,Unnamed: 1_level_0,ID
Club,Nationality,Unnamed: 2_level_1
SSV Jahn Regensburg,Armenia,1
SSV Jahn Regensburg,Denmark,1
SSV Jahn Regensburg,Germany,22
SSV Jahn Regensburg,Kosovo,1
SSV Jahn Regensburg,Lithuania,1
...,...,...
Śląsk Wrocław,Latvia,1
Śląsk Wrocław,Poland,20
Śląsk Wrocław,Portugal,1
Śląsk Wrocław,Serbia,1


In [132]:
# Con esta notación podemos agregar .alias a las columnas

fifa.groupBy("Club").agg(min(fifa.Age).alias("Min Age"),
                         max(fifa.Age).alias("Max Age")).show()

+--------------------+-------+-------+
|                Club|Min Age|Max Age|
+--------------------+-------+-------+
|             Palermo|     18|     37|
|          Göztepe SK|     17|     36|
|CD Everton de Viñ...|     18|     31|
|     Shonan Bellmare|     19|     38|
|          Sagan Tosu|     19|     34|
|  1. FC Union Berlin|     18|     32|
|               Carpi|     18|     31|
|           Puebla FC|     19|     35|
|  Argentinos Juniors|     17|     35|
|     SC Paderborn 07|     18|     36|
|       Karlsruher SC|     18|     35|
|         SC Freiburg|     19|     31|
|San Lorenzo de Al...|     19|     38|
|  SpVgg Unterhaching|     18|     39|
|Universidad Católica|     17|     33|
|         GFC Ajaccio|     18|     35|
|           FC Luzern|     18|     34|
|                 AIK|     17|     38|
|       SC Heerenveen|     17|     34|
|              Santos|     26|     34|
+--------------------+-------+-------+
only showing top 20 rows



In [134]:
# Con .summary() podemos obtener un resultado similar

videos.select("views", "likes", "dislikes")                                      \
      .summary("count", "min", "25%", "50%", "75%", "max", "stddev", "mean").limit(6).toPandas()

Unnamed: 0,summary,views,likes,dislikes
0,count,41061,41043,41035
1,min,Geno’s,Kendall Jenner and Kate Upton for a look at t...,D'Onofrio makes fusilli al ferretto
2,25%,242240.0,5417.0,202.0
3,50%,681439.0,18084.0,631.0
4,75%,1822798.0,55405.0,1937.0
5,max,99999,99990,9993


### Joins

In [None]:
titanic1 = spark.read.csv(path = "../data/titanic 1.csv",
                          inferSchema = True, header = True)

titanic2 = spark.read.csv(path = "..//data/titanic 2.csv",
                          inferSchema = True, header = True)

In [None]:
titanic1.limit(3).toPandas()

In [None]:
titanic2.limit(3).toPandas()

In [None]:
# .union funciona como pd.concat, solo funciona para axis = 0
# Los dfs deben tener la misma cantidad de columnas para funcionar
# Agrega las filas

titanic = titanic1.union(titanic1)

print(titanic1.count())
print(titanic.count())

In [None]:
# Inner Joins
titanic = titanic1.join(other = titanic2, on = ["PassengerId"], how = "inner")

titanic.show()

### Missing Values

In [None]:
# Filtramos con isNull()

titanic.select(["Name", "PassengerId", "Age"]).filter(titanic.Age.isNull()).show(5)

In [None]:
# Con esta funcion podemos contar cuantas filas tienen NaN's

from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = list()
    numRows = df.count()
    
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        
        if (nullRows > 0):
            temp = k, nullRows, (nullRows / numRows)*100
            null_columns_counts.append(temp)
            
    return null_columns_counts

null_columns_calc_list = null_value_calc(titanic)

null_columns_calc_list

In [None]:
spark.createDataFrame(data = null_columns_calc_list,
                      schema = ["Name", "Count", "Percent"]).show()

In [None]:
# df.na.drop() = df.dropna()

titanic.na.drop().limit(6).toPandas()

In [None]:
# .na.drop() sin parametros

og_len = titanic.count()
drop_len = titanic.na.drop().count()

print("Filas eliminadas", og_len - drop_len)
print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

In [None]:
# .na.drop() con threshold = 8

og_len = titanic.count()
drop_len = titanic.na.drop(thresh = 8).count()

print("Filas eliminadas", og_len - drop_len)
print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

In [None]:
# .na.drop() con threshold = 6

og_len = titanic.count()
drop_len = titanic.na.drop(thresh = 6).count()
print("Filas eliminadas", og_len - drop_len)

print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

In [None]:
# .na.drop() podemos elegir por cual columna eliminar las filas

og_len = titanic.count()
drop_len = titanic.na.drop(subset = ["Age"]).count()

print("Filas eliminadas", og_len - drop_len)
print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

In [None]:
# .na.drop() con how = "all" (toda la fila debe tener NaN's)

og_len = titanic.count()
drop_len = titanic.na.drop(how = "all").count()

print("Filas eliminadas", og_len - drop_len)
print("Porcentaje de filas eliminadas", (og_len - drop_len)/og_len*100)

### Fill NaN's

In [None]:
# na.fill(value), "value" debe coincidir con el dtype de la columna
# Si esto no se cumple, na.fill() no hará nada

titanic.na.fill(value = 9999).limit(6).toPandas()

In [None]:
# fila 6
titanic.na.fill(value = "NO AGE").limit(6).toPandas()

In [None]:
# Podemos hacer fill a una columna especifica

titanic.na.fill(value = 9999, subset = ["Age"]).limit(6).toPandas()

In [None]:
# En una linea

titanic.filter(titanic.Age.isNull()).na.fill(value = 9999, subset = ["Age"]).limit(5).toPandas()

In [None]:
# Cambia los NaN's por el promedio de la columna

def fill_with_mean(df, include = set()):
    stats = df.agg(*(avg(c).alias(c) for c in df.columns if c in include))
    
    return df.na.fill(value = stats.first().asDict())

In [None]:
updated_df = fill_with_mean(titanic, ["Age"])

In [None]:
# fila 6
updated_df.limit(6).toPandas()

In [None]:
################################################################################################################################