<a href="https://colab.research.google.com/github/curso-iabd-uclm/hadoop/blob/main/MUMADE_TADM_Spark_0_Instalaci%C3%B3n_Comenzando_con_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Primer Cuaderno Colab Spark

# Inicialización

## Instalación

En primer lugar instalamos y configuramos todas las dependencias de Spark para Python. De esta forma enlazaremos nuestro entorno con el servidor de Spark. Además configuraremos el entorno Spark con las variables que sean necestarias. 



1. Instalar Java Virtual Machine y distribución de Spark

>**NOTA: la última versión de PySpark es la 3.2.0 [link](https://pypi.org/project/pyspark/#history)**

In [None]:
# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz
!tar xf spark-3.2.0-bin-hadoop2.7.tgz


2. Instalar pyspark y findspark

In [None]:
!pip install -q findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=c5a440810d2be6d7b9cf8c9d644d3fb3f0aac84ad83104e5c9ab81ab71141821
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


3. Establecer las variables de entorno

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"

## Inicialización 

Vamos a iniciar una sesión de spark simple para testear nuestra instalación

1. Ejecutamos [findspark.init()](https://pypi.org/project/findspark/) para hacer que pyspark sea importable como una biblioteca normal (añade pyspark al sys.path en el entorno de ejecución) 

In [None]:
import findspark
findspark.init()

2. Importamos las librerías necesarias de Spark

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Si creamos el contexto sin ningún argumento al respecto  se utilizará la variable de entorno SPARK_HOME, y si no está establecida, se comprobarán otras posibles ubicaciones de instalación. 

Más consideraciones sobre en arranque en el siguiente [enlace](https://github.com/minrk/findspark)

Más detalle sobre las opciones de configuración en el siguiente [enlace](http://spark.apache.org/docs/latest/configuration.html)

In [None]:
conf = SparkConf().set("spark.ui.port", "4050")
# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()


In [None]:
spark

Para poder ver el interfaz en Colab de alguna manera hace falta ejecutar la siguiente celda creará un túnel *ngrok* que le permitirá seguir comprobando la interfaz de usuario de Spark.

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')


--2021-11-10 18:33:57--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 18.205.222.128, 54.237.133.81, 52.202.168.65, ...
Connecting to bin.equinox.io (bin.equinox.io)|18.205.222.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-11-10 18:33:59 (7.31 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
!curl -s http://localhost:4040/api/tunnels | python3 -c "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

https://190a-35-185-46-128.ngrok.io


# Manipulando datos

In [None]:
estadisticas = spark.read.csv( "/content/laliga201920.csv" , inferSchema = True, header = True , sep=","   )
estadisticas.show()

+------+-------+---------+--------------------+----------+-------------+-------------+-------------------+-----------+-----------+-----------+------------+------+------+-------+--------+-------------------+---------------------+---------+-------+-------------------+-------------------+-------------+--------------+----------------------+--------------------------+-------------------------+-----+---------------------+----------------------+----------------+---------------+------------+-------------+-------------+-----------+-----------------------------+---------------------------------+------------------+-------------+------------------------------+------------+----------------------------------------------+------------------------------------------------+----------------+---------------------------+---------------+--------------+---------------+----------------+-----------------------+-------------------------+----------------------+------------------------+--------------------------+-

- Completar los goles a 0 que vengan nulos
- Sacar el total de goles por equipo

In [None]:
estadisticas = estadisticas.na.fill({'goals': 0})
estadisticas.groupby('team').sum('goals').show()

+------------------+----------+
|              team|sum(goals)|
+------------------+----------+
|          Espanyol|        27|
|           Leganés|        37|
|   Real Valladolid|        31|
|        Villarreal|        62|
|       Real Madrid|        69|
|     Athletic Club|        40|
|        Granada CF|        50|
|         Barcelona|        95|
|     Real Sociedad|        55|
|             Eibar|        38|
|       Valencia CF|        45|
|           Osasuna|        46|
|Atlético de Madrid|        50|
|     Celta de Vigo|        40|
|           Sevilla|        61|
|        Real Betis|        48|
|           Levante|        45|
|            Getafe|        45|
|            Alavés|        34|
|          Mallorca|        39|
+------------------+----------+



* Defensas goleadores que han jugado más de 500 minutos

In [None]:
jugones=estadisticas.where((col("played_time") > '500')
                            &  (col("position") == 'Defender')
                            &  (col("goals") >  '1')).show()

+---------------+-------+---------+--------------------+--------+-------------+-----------------+--------------------+-----------+-----------+-----------+------------+------+------+-------+--------+-------------------+---------------------+---------+-------+-------------------+-------------------+-------------+--------------+----------------------+--------------------------+-------------------------+-----+---------------------+----------------------+----------------+---------------+------------+-------------+-------------+-----------+-----------------------------+---------------------------------+------------------+-------------+------------------------------+------------+----------------------------------------------+------------------------------------------------+----------------+---------------------------+---------------+--------------+---------------+----------------+-----------------------+-------------------------+----------------------+------------------------+----------------

6. Crear una nueva columna en el DataFrame con la división de dos columnas existentes. Redondear esa columna a 2 decimales. 
- Se tienen muchas columnas de acciones bien y acciones totales, crear una columna con el % de acciones bien para ese evento.  Por ejemplo:
    
* entradas ganadas y entradas totales (tackles_won y tackles_total)

In [None]:
estadisticas = estadisticas.na.fill({'tackles_won': 0})
estadisticas = estadisticas.na.fill({'tackles_total':0})
estadisticas = estadisticas.withColumn("%Entradas ganadas", round((estadisticas['tackles_won']/estadisticas['tackles_total']*100)))
estadisticas.select("player_name","tackles_total","tackles_won","%Entradas ganadas").show() 

+--------------------+-------------+-----------+-----------------+
|         player_name|tackles_total|tackles_won|%Entradas ganadas|
+--------------------+-------------+-----------+-----------------+
|     Mauro Arambarri|           72|         33|             46.0|
|      Markel Bergara|            0|          0|             null|
|        Erick Cabaco|           15|         11|             73.0|
|     Leandro Cabrera|           38|         17|             45.0|
|      Marc Cucurella|           75|         52|             69.0|
|       Djené Dakonam|           41|         31|             76.0|
|           Hugo Duro|            6|          5|             83.0|
|    Oghenekaro Etebo|           12|          4|             33.0|
|      Xabier Etxeita|           20|         11|             55.0|
|         Faycal Fajr|            1|          1|            100.0|
|Enrique Gallego P...|           12|          6|             50.0|
|         Raúl García|           25|         15|             6

   * duelos aéreos ganados y duelos aéreos totales (duels_aerial_won y duels_aerial_total)

## SQL

In [None]:
estadisticas.createOrReplaceTempView("OPTA")
.sqlspark("SELECT * \
           FROM OPTA").show()

+------+-------+---------+--------------------+----------+-------------+-------------+-------------------+-----------+-----------+-----------+------------+------+------+-------+--------+-------------------+---------------------+---------+-------+-------------------+-------------------+-------------+--------------+----------------------+--------------------------+-------------------------+-----+---------------------+----------------------+----------------+---------------+------------+-------------+-------------+-----------+-----------------------------+---------------------------------+------------------+-------------+------------------------------+------------+----------------------------------------------+------------------------------------------------+----------------+---------------------------+---------------+--------------+---------------+----------------+-----------------------+-------------------------+----------------------+------------------------+--------------------------+-

In [None]:
spark.sql("SELECT Team, SUM(goals) as goals FROM OPTA GROUP BY Team ORDER BY GOALS DESC").show()

+------------------+-----+
|              Team|goals|
+------------------+-----+
|         Barcelona|   95|
|       Real Madrid|   69|
|        Villarreal|   62|
|           Sevilla|   61|
|     Real Sociedad|   55|
|Atlético de Madrid|   50|
|        Granada CF|   50|
|        Real Betis|   48|
|           Osasuna|   46|
|            Getafe|   45|
|           Levante|   45|
|       Valencia CF|   45|
|     Athletic Club|   40|
|     Celta de Vigo|   40|
|          Mallorca|   39|
|             Eibar|   38|
|           Leganés|   37|
|            Alavés|   34|
|   Real Valladolid|   31|
|          Espanyol|   27|
+------------------+-----+



## Finalización


In [None]:
sc.stop()