<div style="padding:30px; color: white; background-color: #0071CD">
    <center>
        <img height="75" width="310" src="img/logoub.jpeg"></img>
    <p>
        <h1>Programación Paralela</h1>
        <h2>Spark I</h2>
        <h3>Manuel Ernesto Martínez Martín</h3>
        <h3>Josep Manuel Lopez Camuñas</h3>
    </p>
    </center>
</div>

## EXTRA

### Macos installation

#### You also need Java and Python

#### Spark installation
- brew install apache-spark

#### Add to your "~/.bashrc" (or zsh), X.Y.Z is your version (you can check by using "brew info apache-spark")
- export SPARK_HOME="/usr/local/Cellar/apache-spark/X.Y.Z/libexec/"
- export PYSPARK_DRIVER_PYTHON=jupyter
- export PYSPARK_DRIVER_PYTHON_OPTS=notebook

#### You can start jupyter notebook now by using "pyspark"

## Imports

In [69]:
import pyspark, time
from pyspark.sql.functions import expr, max, min, avg, col, sum, asc, desc

## Tutorial 1

### Carga de fichero

In [2]:
df = spark.read.format("csv").option("header", "true").option("nullValue","NA").option("inferSchema", "true").load("Spark_Tutorial1/2007.csv")

### Muestra el Schema y el número de particiones

In [3]:
print("Schema:")
df.printSchema()
print("-"*60)
print("Particiones:")
print(df.rdd.getNumPartitions())

Schema:
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable

### Eliminar y seleccionar columnas

In [4]:
df2 = df.drop("FlightNum","TailNum","UniqueCarrier")
df2 = df.select("Origin", "Dest", "ArrDelay", "DepDelay")
df2.show()

+------+----+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|
+------+----+--------+--------+
|   SMF| ONT|       1|       7|
|   SMF| PDX|       8|      13|
|   SMF| PDX|      34|      36|
|   SMF| PDX|      26|      30|
|   SMF| PDX|      -3|       1|
|   SMF| PDX|       3|      10|
|   SMF| PHX|      47|      56|
|   SMF| PHX|      -2|       9|
|   SMF| PHX|      44|      47|
|   SMF| PHX|      -7|       3|
|   SMF| PHX|     -11|       1|
|   SMF| PHX|      52|      52|
|   SMF| SAN|      45|      53|
|   SMF| SAN|     -17|      -5|
|   SMF| SAN|      -5|       6|
|   SMF| SAN|      33|      44|
|   SMF| SAN|      -9|       0|
|   SMF| SAN|      -7|       2|
|   SMF| SAN|     -11|       1|
|   SMF| SAN|      36|      29|
+------+----+--------+--------+
only showing top 20 rows



### Eliminamos filas que tengan NA en ArrDelay o DepDelay

In [5]:
df3 = df2.na.drop()

### Añadir columnas

In [6]:
df4 = df3.withColumn("SumDelay", expr("ArrDelay + DepDelay"))
df4.select("DepDelay", "ArrDelay", "SumDelay").show(10)

+--------+--------+--------+
|DepDelay|ArrDelay|SumDelay|
+--------+--------+--------+
|       7|       1|       8|
|      13|       8|      21|
|      36|      34|      70|
|      30|      26|      56|
|       1|      -3|      -2|
|      10|       3|      13|
|      56|      47|     103|
|       9|      -2|       7|
|      47|      44|      91|
|       3|      -7|      -4|
+--------+--------+--------+
only showing top 10 rows



### Max, Min y Average

In [7]:
df4.select(max("SumDelay"),min("SumDelay"),avg("SumDelay")).show()

+-------------+-------------+-----------------+
|max(SumDelay)|min(SumDelay)|    avg(SumDelay)|
+-------------+-------------+-----------------+
|         5199|         -617|21.55425998256014|
+-------------+-------------+-----------------+



### Almacenamiento temporal en memoria caché

In [8]:
df4.cache()

DataFrame[Origin: string, Dest: string, ArrDelay: int, DepDelay: int, SumDelay: int]

### Operaciones de filtraje

In [9]:
df5 = df4.where("SumDelay < 0")
#df5.show()

print("df3 count: %d\n\
df5 count: %d"%(df3.count(), df5.count()))

df3 count: 7275288
df5 count: 3676937


In [10]:
df5 = df4.where("SumDelay < 0").where("Origin == 'JFK'")
#df5.show()

print("df5 count: %d"%(df5.count()))

df5 count: 53321


#### Mismo filtraje de otra forma (se pueden usar variables de python aqui)

In [11]:
i = 0
city = "JFK"
df5 = df4.filter(col("SumDelay") < i).filter(col("Origin") == city)
#df5.show()

print("df5 count: %d"%(df5.count()))

df5 count: 53321


### Ejercicio 1

In [12]:
df5 = df4.where("Origin == 'JFK'")
df5.select(sum("SumDelay"),max("SumDelay"),min("SumDelay"),avg("SumDelay")).show()

+-------------+-------------+-------------+-----------------+
|sum(SumDelay)|max(SumDelay)|min(SumDelay)|    avg(SumDelay)|
+-------------+-------------+-------------+-----------------+
|      4290965|         3111|          -90|35.16175687302823|
+-------------+-------------+-------------+-----------------+



### Ordenación

#### Ascendente

In [13]:
df5 = df4.sort(asc("SumDelay"))
df5.show()

+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   AKN| ANC|    -312|    -305|    -617|
|   CHA| ATL|    -175|    -165|    -340|
|   ANC| FAI|    -162|    -165|    -327|
|   AUS| ATL|    -132|    -124|    -256|
|   ATL| AVL|    -116|    -111|    -227|
|   ANC| SEA|     -13|    -168|    -181|
|   SAN| OKC|    -157|     -19|    -176|
|   HNL| KOA|     -89|     -82|    -171|
|   SFO| HNL|       8|    -169|    -161|
|   ADK| ANC|     -83|     -72|    -155|
|   SJC| SBA|     -82|     -67|    -149|
|   ITO| HNL|     -74|     -71|    -145|
|   ADK| ANC|     -78|     -67|    -145|
|   GNV| ATL|     -82|     -62|    -144|
|   ANC| SEA|      -5|    -137|    -142|
|   PHL| SJU|     -79|     -60|    -139|
|   ADK| ANC|     -79|     -58|    -137|
|   KOA| HNL|     -72|     -65|    -137|
|   YAK| JNU|     -73|     -64|    -137|
|   DHN| ATL|     -75|     -60|    -135|
+------+----+--------+--------+--------+
only showing top

#### Descendente

In [14]:
df5 = df4.sort(desc("SumDelay")).limit(5)
df5.show()

+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   PBI| DTW|    2598|    2601|    5199|
|   ALO| MSP|    1942|    1956|    3898|
|   HNL| MSP|    1848|    1831|    3679|
|   FWA| DTW|    1715|    1736|    3451|
|   FAI| MSP|    1665|    1689|    3354|
+------+----+--------+--------+--------+



### Obtener elementos unicos

In [15]:
df5 = df4.select("Origin").distinct()
print(df5.count())

304


### Ejercicio 2

In [16]:
print("Destinos diferentes: %d"%df4.select("Dest").distinct().count())
print("Combinaciones de origen/destino unicas: %d"%df4.select("Origin","Dest").distinct().count())

Destinos diferentes: 304
Combinaciones de origen/destino unicas: 5032


### Acceso a los datos desde python

In [17]:
datos = df4.limit(5).collect()
print(datos)
print("-"*60)
print(datos[0])
print("-"*60)
print(datos[0][3])

[Row(Origin='SMF', Dest='ONT', ArrDelay=1, DepDelay=7, SumDelay=8), Row(Origin='SMF', Dest='PDX', ArrDelay=8, DepDelay=13, SumDelay=21), Row(Origin='SMF', Dest='PDX', ArrDelay=34, DepDelay=36, SumDelay=70), Row(Origin='SMF', Dest='PDX', ArrDelay=26, DepDelay=30, SumDelay=56), Row(Origin='SMF', Dest='PDX', ArrDelay=-3, DepDelay=1, SumDelay=-2)]
------------------------------------------------------------
Row(Origin='SMF', Dest='ONT', ArrDelay=1, DepDelay=7, SumDelay=8)
------------------------------------------------------------
7


### Escritura de ficheros

In [19]:
print("Particiones %d = numero de ficheros"%df4.rdd.getNumPartitions())
df4_one = df4.coalesce(1) # para no crear n ficheros
print("Particiones %d = numero de ficheros"%df4_one.rdd.getNumPartitions())

df4_one.write.format("csv").option("header", "true").save("Spark_Tutorial1/df4_one.csv")

Particiones 6 == numero de ficheros
Particiones 1 == numero de ficheros


### Ejercicio 3

Cuantas más particiones se tengan más rápido ha de ir por que cada partición la usa un core en el caso de que se ejecute todo en un solo ordenador, pero en nuestro caso al usar un dual core el margen de mejora es insignificante

In [74]:
%%time
df4_sorted = df4.sort(asc("SumDelay"))

CPU times: user 2.27 ms, sys: 2.08 ms, total: 4.35 ms
Wall time: 11.6 ms


In [75]:
%%time
df4_one_sorted = df4_one.sort(asc("SumDelay"))

CPU times: user 2.21 ms, sys: 1.93 ms, total: 4.14 ms
Wall time: 10.1 ms
