## Vamos a crear una sesión de spark y un dataframe de spark simple

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,318 kB]
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,660 kB]
Get:13 https://r2u.stat.illinois.edu/ubuntu jamm

In [2]:
spark

In [3]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [4]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [5]:
df_pandas = df.toPandas()

In [6]:
df_pandas.head()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,4,5.0,string3,2000-03-01,2000-01-03 12:00:00


# API de Spark, tres formas de trabajar con datos distribuidos
La API de Spark nos ofrece 3 formas de trabajar con datos distribuidos:
1. [RDD](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html) Módulo más básico de Spark, requiere de mucho conocimiento a bajo nivel y requiere de mucha sofisticación para procesamientos de datos relativamente básicos. Necesario cuándo lo que queremos hacer no está implantado en SQL o en DataFrame.
2. [SQL](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html) El módulo de SQL nos permite trabajar con los ficheros almacenados en Hadoop y con las tablas Hive utilizando lenguaje SQL.
3. [DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html) El módulo de DataFrame nos permite almacenar en variables de python objetos del tipo DataFrame que tienen muchas similitudes con los dataframes de pandas y nos permites hacer transformaciones de datos de forma más compacta que utilizando RDDs o SQL.

Además, existen otros módulos de Spark que nos permiten desarrollar modelos de Machine Learning, aplicar Pandas a DataFrames de Spark o procesar datos en tiempo real. Todo puede verse en su [documentación](https://spark.apache.org/docs/latest/api/python/index.html).

In [7]:
# En la lección anterior hemos visto un ejemplo de Spark DataFrame
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [8]:
# Podemos crear una vista temporal de este dataframe de Spark para trabajar con él
df.createOrReplaceTempView("prueba")
spark.sql("select a, b, a+b as a_b from prueba").show()

+---+---+---+
|  a|  b|a_b|
+---+---+---+
|  1|2.0|3.0|
|  2|3.0|5.0|
|  4|5.0|9.0|
+---+---+---+



# Input de datos con Spark

Vamos a ver cómo podemos cargar datos utilizando los métodos de Spark. Para ello utilizamos el método [read de Spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html#pyspark.sql.DataFrameReader) se pueden leer multitud de formatos, entre ellos, el csv.

In [15]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [18]:
sdf_football = spark.read.csv("/content/drive/MyDrive/Summer22_FootballTransfers.csv", header=True)

In [None]:
sdf_football.show()

+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+---------------+----------------+-------------+---------------+----------------+
|                name|          position|age|         origin_club|league_origin_club|country_origin_club|            new_club|league_new_club|country_new_club|player_valuje|           cost|date_of_transfer|
+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+---------------+----------------+-------------+---------------+----------------+
|        Amine Gouiri|    Centre-Forward| 22|            OGC Nice|           Ligue 1|             France|    Stade Rennais FC|        Ligue 1|          France|      €42.00m|        €28.00m|      2022-09-01|
|          Umar Sadiq|    Centre-Forward| 25|          UD Almería|            LaLiga|              Spain|       Real Sociedad|         LaLiga|           Spain|      €18.00m

Si tuviesemos conexión con alguna base de datos Hive, podríamos leer las tablas utilizando los métodos spark.table() o spark.sql() directamente.

# Transformaciones de datos con Spark
Vamos a llevar a cabo ejemplos de las transformaciones de datos básicas con Spark DataFrame. Para ello utilizaremos los métodos disponibles en las funciones de [pyspark.sql](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html).

In [19]:

import pyspark.sql.functions as F

# Selección de columnas
print("Ejemplo de seleccion de columnas")
print("-"*10)
sdf_football.select("name", "age").show()


Ejemplo de seleccion de columnas
----------
+--------------------+---+
|                name|age|
+--------------------+---+
|        Amine Gouiri| 22|
|          Umar Sadiq| 25|
|        Carlos Soler| 25|
|       Manuel Akanji| 27|
|           Wout Faes| 24|
|  Leander Dendoncker| 27|
|      Gaëtan Laborde| 28|
|Pierre-Emerick Au...| 33|
|Jörgen Strand Larsen| 22|
|       Billy Gilmour| 21|
|        James Garner| 21|
|     Duje Caleta-Car| 25|
|      El Bilal Touré| 20|
|    Christopher Wooh| 20|
|              Lázaro| 20|
|            Pep Biel| 25|
|        Johan Mojica| 30|
|     Carlos Vinícius| 27|
|         Arthur Melo| 26|
|     Wilfried Gnonto| 18|
+--------------------+---+
only showing top 20 rows



In [20]:
# Crear una nueva columna
print("Ejemplo de creacion de columnas")
print("-"*10)
sdf_football.withColumn("Columna_Estatica", F.lit(1)).show()
sdf_football.withColumn("Edad_medios", F.col("age").cast("float")/2).show()



Ejemplo de creacion de columnas
----------
+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+---------------+----------------+-------------+---------------+----------------+----------------+
|                name|          position|age|         origin_club|league_origin_club|country_origin_club|            new_club|league_new_club|country_new_club|player_valuje|           cost|date_of_transfer|Columna_Estatica|
+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+---------------+----------------+-------------+---------------+----------------+----------------+
|        Amine Gouiri|    Centre-Forward| 22|            OGC Nice|           Ligue 1|             France|    Stade Rennais FC|        Ligue 1|          France|      €42.00m|        €28.00m|      2022-09-01|               1|
|          Umar Sadiq|    Centre-Forward| 25|          UD Alm

In [21]:
# filtros
print("Ejemplo de filtros")
print("-"*10)
sdf_football.filter(F.col("league_origin_club") != "LaLiga").show()


Ejemplo de filtros
----------
+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+---------------+----------------+-------------+---------------+----------------+
|                name|          position|age|         origin_club|league_origin_club|country_origin_club|            new_club|league_new_club|country_new_club|player_valuje|           cost|date_of_transfer|
+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+---------------+----------------+-------------+---------------+----------------+
|        Amine Gouiri|    Centre-Forward| 22|            OGC Nice|           Ligue 1|             France|    Stade Rennais FC|        Ligue 1|          France|      €42.00m|        €28.00m|      2022-09-01|
|       Manuel Akanji|       Centre-Back| 27|   Borussia Dortmund|        Bundesliga|            Germany|     Manchester City| Premier League|

In [22]:
# Agregaciones
print("Ejemplo de Agregaciones")
print("-"*10)

sdf_agg = sdf_football.groupBy("date_of_transfer").agg(F.mean("age").alias("edad_media"), F.countDistinct("origin_club").alias("Clubs_Distintos"), F.count("*").alias("TotalTraspasos"))
sdf_agg.show()


Ejemplo de Agregaciones
----------
+----------------+------------------+---------------+--------------+
|date_of_transfer|        edad_media|Clubs_Distintos|TotalTraspasos|
+----------------+------------------+---------------+--------------+
|      2022-07-08|25.228235294117646|            388|           425|
|      2022-07-04|25.370666666666665|            334|           375|
|      2022-07-30|             25.63|            180|           200|
|      2022-07-23|24.888392857142858|            211|           225|
|      2022-08-17|24.906666666666666|            198|           225|
|      2022-07-25|24.956989247311828|            329|           375|
|      2022-07-15|24.742904841402336|            508|           600|
|      2022-07-06|            25.135|            363|           400|
|      2022-07-17|25.073333333333334|            139|           150|
|      2022-08-26|24.585454545454546|            223|           275|
|      2022-07-09|24.995535714285715|            207|           225|

In [23]:
sdf_football.groupBy("date_of_transfer", "country_new_club").agg(F.mean("age").alias("edad_media"), F.countDistinct("origin_club").alias("Clubs_Distintos"), F.count("*").alias("TotalTraspasos")).show()

+----------------+------------------+------------------+---------------+--------------+
|date_of_transfer|  country_new_club|        edad_media|Clubs_Distintos|TotalTraspasos|
+----------------+------------------+------------------+---------------+--------------+
|      2022-07-19|             Italy|22.903846153846153|             87|           104|
|      2022-07-11|        San Marino|              30.5|              8|             8|
|      2022-08-01|           Germany|24.529411764705884|             16|            17|
|      2022-08-22|            Russia|             24.15|             14|            20|
|      2022-07-19|            Greece|              26.0|              8|             9|
|      2022-07-13|      South Africa|              24.5|              2|             2|
|      2022-07-25|           Bolivia|              22.5|              2|             2|
|      2022-08-11|           Belgium|22.666666666666668|              3|             3|
|      2022-08-10|            Se

In [24]:
# Cruces
print("Ejemplo de Agregaciones")
print("-"*10)
sdf_joined = sdf_football.join(sdf_agg, on=["date_of_transfer"], how="left")
sdf_joined.show()



Ejemplo de Agregaciones
----------
+----------------+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+---------------+----------------+-------------+---------------+------------------+---------------+--------------+
|date_of_transfer|                name|          position|age|         origin_club|league_origin_club|country_origin_club|            new_club|league_new_club|country_new_club|player_valuje|           cost|        edad_media|Clubs_Distintos|TotalTraspasos|
+----------------+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+---------------+----------------+-------------+---------------+------------------+---------------+--------------+
|      2022-09-01|        Amine Gouiri|    Centre-Forward| 22|            OGC Nice|           Ligue 1|             France|    Stade Rennais FC|        Ligue 1|          France|      €42.00m|    

In [25]:

# Operación Pivot
print("Ejemplo de Pivot")
print("-"*10)
sdf_joined.groupby("date_of_transfer").pivot("league_origin_club").agg(F.mean("age").alias("edad_media")).fillna(0).show()



Ejemplo de Pivot
----------
+----------------+------------------+------+--------------------+-------------+-------------+-------------+------------------+------------------+------+--------------+------------------+------------------+----------------+----------------+-------------+-------+---------------------+---------------------+---------------------+--------+----------+------------------+------------------+------------------+-------------------+-------------------+------------+----------------+-----------------+-----------------+------------------+------------+------------+------------+------------+------+------------------+----------------+------------------+-------+------------------+------------------+-------+------+---------+-------+---------+-------+------------------+----------+-----------------------+---------+-------+-------------+------------------+---------------+--------------+-------+-------+-----------+-----------+-------------+------------------+------------------+----

In [26]:
# Ventanas
from pyspark.sql.window import Window
w1 = Window.partitionBy("league_origin_club").orderBy(F.col("age").desc())
sdf_window = sdf_football.withColumn("fila_edad", F.row_number().over(w1))
sdf_window.filter(F.col("league_origin_club") == "LaLiga").show()
sdf_window.filter(F.col("fila_edad") == 1).show()

+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+--------------------+----------------+-------------+-------------+----------------+---------+
|                name|          position|age|         origin_club|league_origin_club|country_origin_club|            new_club|     league_new_club|country_new_club|player_valuje|         cost|date_of_transfer|fila_edad|
+--------------------+------------------+---+--------------------+------------------+-------------------+--------------------+--------------------+----------------+-------------+-------------+----------------+---------+
|         Diego López|        Goalkeeper| 40|RCD Espanyol Barc...|            LaLiga|              Spain|      Rayo Vallecano|              LaLiga|           Spain|      €500Th.|free transfer|      2022-07-02|        1|
|          Dani Alves|        Right-Back| 39|        FC Barcelona|            LaLiga|              Spain|          UNAM 

# Output de datos con Spark
Vamos a ver cómo escribir datos con Spark, utilizando el método [write](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.html#pyspark.sql.DataFrameWriter). Si tuviesemos acceso a una base de datos también podríamos escribir ahí nuestros resultados.

In [27]:
# Para borrar directorios
! rm -R result/resultado*

rm: cannot remove 'result/resultado*': No such file or directory


In [28]:
sdf_resultado = sdf_joined.withColumn("fila_edad", F.row_number().over(w1)).\
            filter(F.col("fila_edad") == 1)

sdf_resultado.show()

+----------------+-------------------+------------------+---+--------------------+--------------------+-------------------+--------------------+----------------+----------------+-------------+-------------+------------------+---------------+--------------+---------+
|date_of_transfer|               name|          position|age|         origin_club|  league_origin_club|country_origin_club|            new_club| league_new_club|country_new_club|player_valuje|         cost|        edad_media|Clubs_Distintos|TotalTraspasos|fila_edad|
+----------------+-------------------+------------------+---+--------------------+--------------------+-------------------+--------------------+----------------+----------------+-------------+-------------+------------------+---------------+--------------+---------+
|      2022-08-31|          Zhi Zheng|Defensive Midfield| 42|        Without Club|                NULL|               NULL|        Guangzhou FC|    Super League|           China|       €25Th.|       

In [29]:
sdf_resultado.write.csv("result/resultado1.csv")
sdf_resultado.repartition(2).write.csv("result/resultado2.csv")
sdf_resultado.repartition("date_of_transfer").write.parquet("result/resultado3")

In [30]:
!ls -lrth result/resultado1.csv

total 76K
-rw-r--r-- 1 root root 73K Feb 25 19:11 part-00000-562c0455-52ea-4bc9-bea0-b85d0eaf15c0-c000.csv
-rw-r--r-- 1 root root   0 Feb 25 19:11 _SUCCESS


In [31]:
!ls -lrth result/resultado2.csv

total 80K
-rw-r--r-- 1 root root 37K Feb 25 19:12 part-00000-e672c5ed-01fb-4258-abad-9fc74a68d829-c000.csv
-rw-r--r-- 1 root root 37K Feb 25 19:12 part-00001-e672c5ed-01fb-4258-abad-9fc74a68d829-c000.csv
-rw-r--r-- 1 root root   0 Feb 25 19:12 _SUCCESS


In [32]:
!ls -lrth result/resultado3

total 40K
-rw-r--r-- 1 root root 40K Feb 25 19:12 part-00000-659d5d5c-c5b2-4cc0-b1e3-f090b9bb5551-c000.snappy.parquet
-rw-r--r-- 1 root root   0 Feb 25 19:12 _SUCCESS
