# Prerrequisites

Installing Spark and Apache Kafka Library in VM


---



In [None]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.1
!wget -q https://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

# unzip it
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install py4j

# For maps
!pip install folium
!pip install plotly

Define the environment (Java & Spark homes)

---

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

Starting Spark Session and print the version


---


In [None]:
import findspark
findspark.init("spark-3.0.1-bin-hadoop3.2")# SPARK_HOME

from pyspark.sql import SparkSession

# create the session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .config("spark.ui.port", "4500") \
        .getOrCreate()

spark.version

In [None]:
spark

In [None]:
# For Pandas conversion optimization
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

Creating ngrok tunnel to allow Spark UI (Optional)


In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip -o ngrok-stable-linux-amd64.zip
!sleep 2
get_ipython().system_raw('./ngrok http 4500 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

# Descargar Datasets

In [None]:
!mkdir -p /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/bank.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/vehicles.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/characters.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/planets.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/species.csv -P /dataset
!ls /dataset

# Windows Partitioning

---



## Ejemplo 1

In [None]:
!head /dataset/bank.csv

Leyendo Datos del fichero bank.csv a un Dataframe

In [None]:
from pyspark.sql.functions import *

bank_df = spark.read.format("csv") \
  .option("sep", ";") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load("/dataset/bank.csv")

In [None]:
bank_df.show()

**Obtén el balance de las dos personas más jóvenes por tipo de trabajo**


In [None]:
from pyspark.sql.window import Window

byJob = Window.partitionBy("job").orderBy("age")

bank_df \
  .withColumn("new_column_job", row_number().over(byJob)) \
  .filter(col("new_column_job") <= 2) \
  .select("age", "job", "balance") \
  .orderBy("job", "age") \
  .show()

## Ejercicio 1

**A partir del Dataframe formado a partir del fichero "bank.csv". 
Obtén el Top 3 de máximos balances por estado civil**


---




## Ejercicio 2



**Carga el fichero de vehicles.csv en un DataFrame**

In [None]:
!head /dataset/vehicles.csv

**Para cada uno de los vehículos, obtén la diferencia de precio (*cost_in_credits*) para cada producto con respecto al más barato en la misma clase de vehículo**


---



# Joins

## Ejercicio 3

**Crea los dataframes correspondientes para los ficheros "characters.csv" y "planets.csv". <br/>
Obtén la gravedad del planeta para cada personaje. Selecciona sólo el nombre del personaje y planeta además de su gravedad**


---




## Ejercicio 4

**Revisa el plan de ejecución del ejercicio 3. ¿Qué tipo de join se está ejecutando? ¿Por qué?**

---

**Después de revisar el plan de ejecución, ejecuta las siguientes instrucciones**

---

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", '0')

In [None]:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

**Vuelve a ejecutar la consulta del ejercicio 3 que contiene el Join**

---

## Ejercicio 5

**Crea un DataFrame a partir del fichero de "species.csv" y reparticiona este y el DataFrame de Characters a 100 particiones**


---



## Ejercicio 6

**Obtén la clasificación de especies para cada personaje. Selecciona sólo el nombre del personaje y su clasificación de especie**<br>
Usa los datframes reparticionados


---



## Ejercicio 7

**Ejecuta la siguiente operación sobre el DataFrame del ejercicio 6 y observa la diferencia de reparto de rows entre las particiones**

---

