In [1]:
!pip install pyspark pandas numpy findspark



In [7]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
conf = SparkConf().setAppName('spark-app').setMaster('local[*]')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

In [3]:
# Downloading and preprocessing Cars Data downloaded origianlly from https://perso.telecom-paristech.fr/eagan/class/igr204/datasets
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

--2022-07-02 12:14:02--  https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobceles.github.io (jacobceles.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobceles.github.io (jacobceles.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv [following]
--2022-07-02 12:14:02--  https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobcelestine.com (jacobcelestine.com)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobcelestine.com (jacobcelestine.com)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22608 (22K) [text/csv]
Saving to: ‘cars.csv’


2022-07-02 12:14:02 (65.6 MB/s) - ‘cars.csv’ saved [22608/22608]



In [4]:
!ls

cars.csv  clase_9_filtering.ipynb


In [5]:
df = spark.read.csv('cars.csv', header=True, sep=";")
df.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



<a id='operaciones-del-marco-de-datos-en-filas'></a>
## Operaciones de DataFrame en filas

Discutiremos lo siguiente en esta sección:
1. Filtrado de Filas
2. Obtener Filas Distintas
3. Orden de filas
4. Uniones de dataframes

<a id='filtrado-filas'></a>
### Filtrado de filas

In [8]:
# Filtering rows in PySpark
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count)) 
europe_filtered_count = df.filter(col('Origin')=='Europe').count()
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

TOTAL RECORD COUNT: 406
EUROPE FILTERED RECORD COUNT: 73
+----------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+----------------------------+----+---------+------------+----------+------+------------+-----+------+
|Citroen DS-21 Pallas        |0   |4        |133.0       |115.0     |3090. |17.5        |70   |Europe|
|Volkswagen 1131 Deluxe Sedan|26.0|4        |97.00       |46.00     |1835. |20.5        |70   |Europe|
|Peugeot 504                 |25.0|4        |110.0       |87.00     |2672. |17.5        |70   |Europe|
|Audi 100 LS                 |24.0|4        |107.0       |90.00     |2430. |14.5        |70   |Europe|
|Saab 99e                    |25.0|4        |104.0       |95.00     |2375. |17.5        |70   |Europe|
|BMW 2002                    |26.0|4        |121.0       |113.0     |2234. |12.5        |70   |Europe|
|Volkswagen Supe

In [9]:
# Filtering rows in PySpark based on Multiple conditions
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count)) 
europe_filtered_count = df.filter((col('Origin')=='Europe') & 
                                  (col('Cylinders')==4)).count() # Two conditions added here
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('Origin')=='Europe').show(truncate=False)

TOTAL RECORD COUNT: 406
EUROPE FILTERED RECORD COUNT: 66
+----------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+----------------------------+----+---------+------------+----------+------+------------+-----+------+
|Citroen DS-21 Pallas        |0   |4        |133.0       |115.0     |3090. |17.5        |70   |Europe|
|Volkswagen 1131 Deluxe Sedan|26.0|4        |97.00       |46.00     |1835. |20.5        |70   |Europe|
|Peugeot 504                 |25.0|4        |110.0       |87.00     |2672. |17.5        |70   |Europe|
|Audi 100 LS                 |24.0|4        |107.0       |90.00     |2430. |14.5        |70   |Europe|
|Saab 99e                    |25.0|4        |104.0       |95.00     |2375. |17.5        |70   |Europe|
|BMW 2002                    |26.0|4        |121.0       |113.0     |2234. |12.5        |70   |Europe|
|Volkswagen Supe

<a id='obtener-filas-distintas'></a>
### Obtener filas distintas

In [10]:
#Get Unique Rows in PySpark
df.select('Origin').distinct().show()

+------+
|Origin|
+------+
|Europe|
|    US|
| Japan|
+------+



In [11]:
#Get Unique Rows in PySpark based on mutliple columns
df.select('Origin','model').distinct().show()

+------+-----+
|Origin|model|
+------+-----+
| Japan|   76|
|    US|   81|
|    US|   80|
|    US|   76|
| Japan|   70|
|    US|   78|
|Europe|   76|
|    US|   70|
| Japan|   75|
|Europe|   80|
| Japan|   77|
|Europe|   72|
|    US|   75|
|    US|   79|
|    US|   82|
|Europe|   75|
| Japan|   78|
|    US|   71|
| Japan|   82|
| Japan|   80|
+------+-----+
only showing top 20 rows



<a id='ordenar-filas'></a>
### Orden de filas

In [12]:
# Sort Rows in PySpark
# By default the data will be sorted in ascending order
df.orderBy('Cylinders').show(truncate=False) 

+----------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                         |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+----------------------------+----+---------+------------+----------+------+------------+-----+------+
|Mazda RX-4                  |21.5|3        |80.00       |110.0     |2720. |13.5        |77   |Japan |
|Mazda RX-7 GS               |23.7|3        |70.00       |100.0     |2420. |12.5        |80   |Japan |
|Mazda RX2 Coupe             |19.0|3        |70.00       |97.00     |2330. |13.5        |72   |Japan |
|Mazda RX3                   |18.0|3        |70.00       |90.00     |2124. |13.5        |73   |Japan |
|Datsun 510 (sw)             |28.0|4        |97.00       |92.00     |2288. |17.0        |72   |Japan |
|Opel 1900                   |28.0|4        |116.0       |90.00     |2123. |14.0        |71   |Europe|
|Mercury Capri 2000          |23.0|4        |122.0       |86.00     |2220

In [13]:
# To change the sorting order, you can use the ascending parameter
df.orderBy('Cylinders', ascending=False).show(truncate=False) 

+-------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+
|Plymouth 'Cuda 340       |14.0|8        |340.0       |160.0     |3609. |8.0         |70   |US    |
|Pontiac Safari (sw)      |13.0|8        |400.0       |175.0     |5140. |12.0        |71   |US    |
|Ford Mustang Boss 302    |0   |8        |302.0       |140.0     |3353. |8.0         |70   |US    |
|Buick Skylark 320        |15.0|8        |350.0       |165.0     |3693. |11.5        |70   |US    |
|Chevrolet Monte Carlo    |15.0|8        |400.0       |150.0     |3761. |9.5         |70   |US    |
|AMC Rebel SST            |16.0|8        |304.0       |150.0     |3433. |12.0        |70   |US    |
|Buick Estate Wagon (sw)  |14.0|8        |455.0       |225.0     |3086. |10.0        |70   |US    |


In [14]:
# Using groupBy aand orderBy together
df.groupBy("Origin").count().orderBy('count', ascending=False).show(10)

+------+-----+
|Origin|count|
+------+-----+
|    US|  254|
| Japan|   79|
|Europe|   73|
+------+-----+



<a id='union-dataframes'></a>
### Uniones de dataframes

Verá tres métodos principales para realizar la unión de dataframes. Es importante saber la diferencia entre ellos y cuál es el preferido:

* `union()`: se utiliza para fusionar dos DataFrames de la misma estructura/esquema. Si los esquemas no son iguales, devuelve un error.
* `unionAll()`: esta función está en desuso desde Spark 2.0.0 y se reemplazó con union()
* `unionByName()`: esta función se usa para fusionar dos dataframes según el nombre de la columna.

> Dado que `unionAll()` está en desuso, **`union()` es el método preferido para fusionar dataframes.**
<br>
> La diferencia entre `unionByName()` y `union()` es que `unionByName()` resuelve las columnas por nombre, no por posición.

En otros SQL, Union elimina los duplicados, pero UnionAll fusiona dos conjuntos de datos, por lo que incluye registros duplicados. Pero, en PySpark, ambos se comportan igual e incluyen registros duplicados. La recomendación es utilizar `distinct()` o `dropDuplicates()` para eliminar registros duplicados.

In [18]:
# CASE 1: Union When columns are in order
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)
europe_cars = df.filter((col('Origin')=='Europe') & (col('Cylinders')==5))
europe_cars.show()

+-------------------+----+---------+------------+----------+------+------------+-----+------+
|                Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+-------------------+----+---------+------------+----------+------+------------+-----+------+
|          Audi 5000|20.3|        5|       131.0|     103.0|  2830|        15.9|   78|Europe|
| Mercedes Benz 300d|25.4|        5|       183.0|      77.0|  3530|        20.1|   79|Europe|
|Audi 5000s (diesel)|36.4|        5|       121.0|      67.0|  2950|        19.9|   80|Europe|
+-------------------+----+---------+------------+----------+------+------------+-----+------+



In [20]:
japan_cars = df.filter((col('Origin')=='Japan') & (col('Cylinders')==3))
japan_cars.show()

+---------------+----+---------+------------+----------+------+------------+-----+------+
|            Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+---------------+----+---------+------------+----------+------+------------+-----+------+
|Mazda RX2 Coupe|19.0|        3|        70.0|      97.0|  2330|        13.5|   72| Japan|
|      Mazda RX3|18.0|        3|        70.0|      90.0|  2124|        13.5|   73| Japan|
|     Mazda RX-4|21.5|        3|        80.0|     110.0|  2720|        13.5|   77| Japan|
|  Mazda RX-7 GS|23.7|        3|        70.0|     100.0|  2420|        12.5|   80| Japan|
+---------------+----+---------+------------+----------+------+------------+-----+------+



In [21]:
print("EUROPE CARS: "+str(europe_cars.count()))
print("JAPAN CARS: "+str(japan_cars.count()))
print("AFTER UNION: "+str(europe_cars.union(japan_cars).count()))

EUROPE CARS: 3
JAPAN CARS: 4
AFTER UNION: 7


In [23]:
europe_cars.union(japan_cars).show()

+-------------------+----+---------+------------+----------+------+------------+-----+------+
|                Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+-------------------+----+---------+------------+----------+------+------------+-----+------+
|          Audi 5000|20.3|        5|       131.0|     103.0|  2830|        15.9|   78|Europe|
| Mercedes Benz 300d|25.4|        5|       183.0|      77.0|  3530|        20.1|   79|Europe|
|Audi 5000s (diesel)|36.4|        5|       121.0|      67.0|  2950|        19.9|   80|Europe|
|    Mazda RX2 Coupe|19.0|        3|        70.0|      97.0|  2330|        13.5|   72| Japan|
|          Mazda RX3|18.0|        3|        70.0|      90.0|  2124|        13.5|   73| Japan|
|         Mazda RX-4|21.5|        3|        80.0|     110.0|  2720|        13.5|   77| Japan|
|      Mazda RX-7 GS|23.7|        3|        70.0|     100.0|  2420|        12.5|   80| Japan|
+-------------------+----+---------+------------+----------+

**Resultado:**

> Como puede ver aquí, había 3 autos de Europa con 5 Cilindros y 4 autos de Japón con 3 Cilindros. Después de la unión, hay 7 autos en total.

In [24]:
# CASE 1: Union When columns are not in order
# Creating two dataframes with jumbled columns
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+



**Resultado:**

> Como puede ver aquí, los dos dataframes se han fusionado con éxito en función de sus nombres de columna.

<a id='funciones-de-manipulación-de-datos-comunes'></a>
## Funciones comunes de manipulación de datos

In [25]:
# Functions available in PySpark
from pyspark.sql import functions
# Similar to python, we can use the dir function to view the avaiable functions
print(dir(functions)) 



<a id='string-functions'></a>
### String Functions

In [26]:
# Loading the data
from pyspark.sql.functions import col
df = spark.read.csv('cars.csv', header=True, sep=";", inferSchema=True)

**Mostrar la columna Car en minúscula y mayúscula, y los primeros 4 caracteres de la columna**

In [33]:
from pyspark.sql.functions import col,lower, upper, substring, lit
# Prints out the details of a function
help(substring)


Help on function substring in module pyspark.sql.functions:

substring(str, pos, len)
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. versionadded:: 1.5.0
    
    Notes
    -----
    The position is not zero based, but 1 based index.
    
    Examples
    --------
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]



In [34]:
# alias is used to rename the column name in the output
df.select(col('Car'),lower(col('Car')),upper(col('Car')),substring(col('Car'),1,4).alias("concatenated value")).show(5, False)

+-------------------------+-------------------------+-------------------------+------------------+
|Car                      |lower(Car)               |upper(Car)               |concatenated value|
+-------------------------+-------------------------+-------------------------+------------------+
|Chevrolet Chevelle Malibu|chevrolet chevelle malibu|CHEVROLET CHEVELLE MALIBU|Chev              |
|Buick Skylark 320        |buick skylark 320        |BUICK SKYLARK 320        |Buic              |
|Plymouth Satellite       |plymouth satellite       |PLYMOUTH SATELLITE       |Plym              |
|AMC Rebel SST            |amc rebel sst            |AMC REBEL SST            |AMC               |
|Ford Torino              |ford torino              |FORD TORINO              |Ford              |
+-------------------------+-------------------------+-------------------------+------------------+
only showing top 5 rows



**Concatene la columna Car y Model y agregue un espacio entre ellas.**

In [35]:
from pyspark.sql.functions import concat
df.select(col("Car"),col("model"),concat(col("Car"), lit(" "), col("model"))).show(5, False)

+-------------------------+-----+----------------------------+
|Car                      |model|concat(Car,  , model)       |
+-------------------------+-----+----------------------------+
|Chevrolet Chevelle Malibu|70   |Chevrolet Chevelle Malibu 70|
|Buick Skylark 320        |70   |Buick Skylark 320 70        |
|Plymouth Satellite       |70   |Plymouth Satellite 70       |
|AMC Rebel SST            |70   |AMC Rebel SST 70            |
|Ford Torino              |70   |Ford Torino 70              |
+-------------------------+-----+----------------------------+
only showing top 5 rows



<a id='funciones-numéricas'></a>
### Funciones numéricas

**Mostrar el carro más liviano y el carro más pesado**

In [38]:
from pyspark.sql.functions import min, max
df.select(min(col('Weight')), max(col('Weight'))).show()

+-----------+-----------+
|min(Weight)|max(Weight)|
+-----------+-----------+
|       1613|       5140|
+-----------+-----------+



**Suma 10 al peso mínimo y máximo**from pyspark.sql.functions import min, max, lit
df.select(min(col('Weight'))+lit(10), max(col('Weight')+lit(10))).show()

In [39]:
from pyspark.sql.functions import min, max, lit
df.select(min(col('Weight'))+lit(10), max(col('Weight')+lit(10))).show()

+------------------+------------------+
|(min(Weight) + 10)|max((Weight + 10))|
+------------------+------------------+
|              1623|              5150|
+------------------+------------------+



<a id='operaciones-en-fecha'></a>
### Operaciones con fechas

> [PySpark sigue la tabla SimpleDateFormat de Java. Haga clic aquí para ver los documentos.](https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html)

In [42]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df.show()


+-------------------+
|                DOB|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+



In [43]:
df.printSchema()

root
 |-- DOB: string (nullable = true)



In [44]:
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('DOB'),'yyyy-MM-dd HH:mm:ss'))
df.show()
df.printSchema()

+---------------------------------+--------------------------------------+
|to_date(DOB, yyyy-MM-dd HH:mm:ss)|to_timestamp(DOB, yyyy-MM-dd HH:mm:ss)|
+---------------------------------+--------------------------------------+
|                       2019-12-25|                   2019-12-25 13:30:00|
+---------------------------------+--------------------------------------+

root
 |-- to_date(DOB, yyyy-MM-dd HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, yyyy-MM-dd HH:mm:ss): timestamp (nullable = true)



**¿Qué día es 3 días antes de la fecha más antigua y 3 días después de la fecha más reciente?**

In [46]:
from pyspark.sql.functions import date_add, date_sub
# create a dummy dataframe
df = spark.createDataFrame([('1990-01-01',),('1995-01-03',),('2021-03-30',)], ['Date'])
# find out the required dates
df.select(date_add(max(col('Date')),3), date_sub(min(col('Date')),3)).show()

+----------------------+----------------------+
|date_add(max(Date), 3)|date_sub(min(Date), 3)|
+----------------------+----------------------+
|            2021-04-02|            1989-12-29|
+----------------------+----------------------+



<a id='joins-in-pyspark'></a>
## Joins en PySpark

In [47]:
# Create two dataframes
cars_df = spark.createDataFrame([[1, 'Car A'],[2, 'Car B'],[3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()

+---+--------+
| id|car_name|
+---+--------+
|  1|   Car A|
|  2|   Car B|
|  3|   Car C|
+---+--------+

+---+---------+
| id|car_price|
+---+---------+
|  1|     1000|
|  2|     2000|
|  3|     3000|
+---+---------+



In [48]:
# Executing an inner join so we can see the id, name and price of each car in one row
cars_df.join(car_price_df, cars_df.id == car_price_df.id, 'inner').select(cars_df['id'],cars_df['car_name'],car_price_df['car_price']).show(truncate=False)

+---+--------+---------+
|id |car_name|car_price|
+---+--------+---------+
|1  |Car A   |1000     |
|2  |Car B   |2000     |
|3  |Car C   |3000     |
+---+--------+---------+



Como puede ver, hemos realizado una unión interna entre dos dataframes. Las siguientes uniones son compatibles con PySpark:
1. inner (default)
2. cross
3. outer
4. full
5. full_outer
6. left
7. left_outer
8. right
9. right_outer
10. left_semi
11. left_anti

<a id='spark-sql'></a>
## Spark SQL

SQL ha existido desde la década de 1970, por lo que uno puede imaginar la cantidad de personas que lo convirtieron en su día a día. A medida que el big data se hizo popular, la cantidad de profesionales con el conocimiento técnico para manejarlo fue escaso. Esto condujo a la creación de Spark SQL. Para citar los documentos:<br>
>Spark SQL es un módulo de Spark para el procesamiento de datos estructurados. A diferencia de la API básica de Spark RDD, las interfaces proporcionadas por Spark SQL brindan a Spark más información sobre la estructura de los datos y el cálculo que se está realizando. Internamente, Spark SQL usa esta información adicional para realizar optimizaciones adicionales.

Básicamente, lo que necesita saber es que Spark SQL se usa para ejecutar consultas SQL en big data. Spark SQL también se puede usar para leer datos de tablas y vistas de Hive. Permítanme explicar Spark SQL con un ejemplo.

In [50]:
# Load data
df = spark.read.csv('cars.csv', header=True, sep=";")
# Register Temporary Table
df.createOrReplaceTempView("temp")
# Select all data from temp table
spark.sql("select * from temp limit 5").show()
# Select count of data in table
spark.sql("select count(*) as total_count from temp").show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+

+-----------+
|total_count|
+-----------+
|        406|
+-----------+



Como puede ver, registramos el dataframe como una tabla temporal y luego ejecutamos consultas SQL básicas en él. ¡¿Qué tan asombroso es eso?!<br>
Si usted es una persona que se siente más cómoda con SQL, ¡entonces esta característica es realmente una bendición para usted! Pero esto plantea una pregunta:
> *¿Debería seguir usando Spark SQL todo el tiempo?*

Y la respuesta es, _**depende**_.<br>
Básicamente, las diferentes funciones actúan de diferentes maneras y, según el tipo de acción que intente realizar, la velocidad a la que completa la ejecución también difiere. Pero a medida que pasa el tiempo, esta función mejora cada vez más, por lo que es de esperar que la diferencia sea un pequeño margen. Hay muchos análisis realizados sobre esto, pero nada tiene una respuesta definitiva todavía. Puede leer este [estudio comparativo realizado por Horton Works](https://community.cloudera.com/t5/Community-Articles/Spark-RDDs-vs-DataFrames-vs-SparkSQL/ta-p/246547) o la respuesta a esta [pregunta de stackoverflow] (https://stackoverflow.com/questions/45430816/writing-sql-vs-using-dataframe-apis-in-spark-sql) si todavía tiene curiosidad al respecto.

<a id='rdd'></a>
## RDD

> Con map, defines una función y luego la aplicas registro por registro. Flatmap devuelve un nuevo RDD aplicando primero una función a todos los elementos en los RDD y luego aplanando el resultado. Filtro, devuelve un nuevo RDD. Es decir, sólo los elementos que satisfacen una condición. Con reduce, estamos tomando elementos vecinos y produciendo un único resultado combinado.
Por ejemplo, supongamos que tiene un conjunto de números. Puede reducir esto a su suma proporcionando una función que toma como entrada dos valores y los reduce a uno.

Algunas de las razones por las que usaría un dataframe sobre RDD son:
1. Es la capacidad de representar datos como filas y columnas. Pero esto también significa que solo puede contener datos estructurados y semiestructurados.
2. Permite procesar datos en diferentes formatos (AVRO, CSV, JSON, y sistema de almacenamiento HDFS, tablas HIVE, MySQL).
3. Es una capacidad superior de optimización de trabajos.
4. La API de DataFrame es muy fácil de usar.

In [54]:
cars = spark.sparkContext.textFile('cars.csv')
print(cars.first())


Car;MPG;Cylinders;Displacement;Horsepower;Weight;Acceleration;Model;Origin


In [55]:
cars_header = cars.first()
cars_rest = cars.filter(lambda line: line!=cars_header)
print(cars_rest.first())

Chevrolet Chevelle Malibu;18.0;8;307.0;130.0;3504.;12.0;70;US


**¿Cuántos autos hay en nuestro csv?**

In [57]:
cars_rest.map(lambda line: line.split(";")).count()

406

**Muestra el nombre del coche, MPG, cilindros, peso y origen de los coches con origen en Europa**

In [62]:
# Car name is column  0
(cars_rest.filter(lambda line: line.split(";")[8]=='Europe').
 map(lambda line: (line.split(";")[0],
    line.split(";")[1],
    line.split(";")[2],
    line.split(";")[5],
    line.split(";")[8])).collect())

[('Citroen DS-21 Pallas', '0', '4', '3090.', 'Europe'),
 ('Volkswagen 1131 Deluxe Sedan', '26.0', '4', '1835.', 'Europe'),
 ('Peugeot 504', '25.0', '4', '2672.', 'Europe'),
 ('Audi 100 LS', '24.0', '4', '2430.', 'Europe'),
 ('Saab 99e', '25.0', '4', '2375.', 'Europe'),
 ('BMW 2002', '26.0', '4', '2234.', 'Europe'),
 ('Volkswagen Super Beetle 117', '0', '4', '1978.', 'Europe'),
 ('Opel 1900', '28.0', '4', '2123.', 'Europe'),
 ('Peugeot 304', '30.0', '4', '2074.', 'Europe'),
 ('Fiat 124B', '30.0', '4', '2065.', 'Europe'),
 ('Volkswagen Model 111', '27.0', '4', '1834.', 'Europe'),
 ('Volkswagen Type 3', '23.0', '4', '2254.', 'Europe'),
 ('Volvo 145e (sw)', '18.0', '4', '2933.', 'Europe'),
 ('Volkswagen 411 (sw)', '22.0', '4', '2511.', 'Europe'),
 ('Peugeot 504 (sw)', '21.0', '4', '2979.', 'Europe'),
 ('Renault 12 (sw)', '26.0', '4', '2189.', 'Europe'),
 ('Volkswagen Super Beetle', '26.0', '4', '1950.', 'Europe'),
 ('Fiat 124 Sport Coupe', '26.0', '4', '2265.', 'Europe'),
 ('Fiat 128', '29

**Muestra el nombre del auto, MPG, cilindros, peso y origen de los autos que se originan en Europa o Japón**

In [63]:
# Car name is column  0
(cars_rest.filter(lambda line: line.split(";")[8] in ['Europe','Japan']).
 map(lambda line: (line.split(";")[0],
    line.split(";")[1],
    line.split(";")[2],
    line.split(";")[5],
    line.split(";")[8])).collect())

[('Citroen DS-21 Pallas', '0', '4', '3090.', 'Europe'),
 ('Toyota Corolla Mark ii', '24.0', '4', '2372.', 'Japan'),
 ('Datsun PL510', '27.0', '4', '2130.', 'Japan'),
 ('Volkswagen 1131 Deluxe Sedan', '26.0', '4', '1835.', 'Europe'),
 ('Peugeot 504', '25.0', '4', '2672.', 'Europe'),
 ('Audi 100 LS', '24.0', '4', '2430.', 'Europe'),
 ('Saab 99e', '25.0', '4', '2375.', 'Europe'),
 ('BMW 2002', '26.0', '4', '2234.', 'Europe'),
 ('Datsun PL510', '27.0', '4', '2130.', 'Japan'),
 ('Toyota Corolla', '25.0', '4', '2228.', 'Japan'),
 ('Volkswagen Super Beetle 117', '0', '4', '1978.', 'Europe'),
 ('Opel 1900', '28.0', '4', '2123.', 'Europe'),
 ('Peugeot 304', '30.0', '4', '2074.', 'Europe'),
 ('Fiat 124B', '30.0', '4', '2065.', 'Europe'),
 ('Toyota Corolla 1200', '31.0', '4', '1773.', 'Japan'),
 ('Datsun 1200', '35.0', '4', '1613.', 'Japan'),
 ('Volkswagen Model 111', '27.0', '4', '1834.', 'Europe'),
 ('Toyota Corolla Hardtop', '24.0', '4', '2278.', 'Japan'),
 ('Volkswagen Type 3', '23.0', '4', '

<a id='user-defined-functions-udf'></a>
## User-Defined Functions (UDF)

<a id='creating-dataframes'></a>
## Creando Dataframes

Al comenzar con los dataframes, la pregunta más común es: *'¿Cómo creo un dataframes?'* <br>
A continuación, puede ver cómo crear tres tipos de dataframes:

### Crear un dataframes totalmente vacío

In [68]:
from pyspark.sql.types import StructType
from pyspark.sql.types import *
sc = spark.sparkContext
#Create empty df
schema = StructType([])
empty = spark.createDataFrame(sc.emptyRDD(), schema)
empty.show()

++
||
++
++



### Crear un dataframes totalmente vacío con encabezados

In [70]:
from pyspark.sql.types import StructType, StructField
#Create empty df with header
schema_header = StructType([StructField("name", StringType(), True)])
empty_with_header = spark.createDataFrame(sc.emptyRDD(), schema_header)
empty_with_header.show()

+----+
|name|
+----+
+----+



### Crear un dataframes con datos

In [71]:
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":13},
  {"name":'Jacob',"age":24},
  {"name":'Betty',"age":135},
]
spark.createDataFrame(Row(**x) for x in mylist).show()

+-----+---+
| name|age|
+-----+---+
|Alice| 13|
|Jacob| 24|
|Betty|135|
+-----+---+



In [72]:
# You can achieve the same using this - note that we are using spark context here, not a spark session
from pyspark.sql import Row
df = sc.parallelize([
        Row(name='Alice', age=13),
        Row(name='Jacob', age=24),
        Row(name='Betty', age=135)]).toDF()
df.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 13|
|Jacob| 24|
|Betty|135|
+-----+---+



In [73]:
sc.stop()