#Sección 2 - Descarga e Instalación de Spark en Google Colab

In [None]:
# Instalar SDK Java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Descargar Spark 3.3.3

!wget -q http://apache.osuosl.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz

In [None]:
# Descomprimir el archivo descargado de Spark

!tar xf spark-3.3.3-bin-hadoop3.tgz

In [None]:
# Establecer las variables de entorno

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"


In [None]:
# Instalar la librería findspark

!pip install -q findspark

In [None]:
# Instalar pyspark

!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# Verificar instalacion

import findspark

findspark.init()


In [None]:
# Crear la sesion de Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName('Curso Pyspark').getOrCreate()

In [None]:
#Crear Spark context
sc = spark.sparkContext

#Ejercicios Seccion 3 - Introducción RDD en Spark

1. Cree una sesión de Spark con nombre Cap2 y asegúrese de que emplea todos los cores disponibles para ejecutar en su ambiente de trabajo.

2. Cree dos RDD vacíos, uno de ellos no debe contener particiones y el otro debe tener 5 particiones. Utilice vías diferentes para crear cada RDD.

3. Cree un RDD que contenga los números primos que hay entre 1 y 20.

4. Cree un nuevo RDD a partir del RDD creado en el ejercicio anterior el cuál solo contenga los números primos mayores a 10.

5. Descargue el archivo de texto adjunto a esta lección como recurso y guárdelo en una carpeta llamada data en el ambiente de trabajo de Colab.
  * Cree un RDD a partir de este archivo de texto en donde todo el documento esté contenido en un solo registro. ¿Cómo podría saber la dirección donde está guardado el archivo de texto a partir del RDD creado?

  * Si necesitara crear un RDD a partir del archivo de texto cargado previamente en donde cada línea del archivo fuera un registro del RDD, ¿cómo lo haría?

In [None]:
#1
spark = SparkSession.builder.master("local[*]").appName("Cap2").getOrCreate()

In [None]:
#2
rdd_vacio = sc.emptyRDD()
rdd_vacio5 = sc.parallelize([],5)
rdd_vacio.getNumPartitions()

In [None]:
#3
rdd20 = sc.parallelize(range(1,21))
rdd20.collect()
rdd_primos = sc.parallelize([1,2,3,5,7,11,13,17,19])
rdd_primos.collect()

In [None]:
#4
rdd10 = rdd_primos.filter(lambda x: x>10).collect()
print(rdd10)

In [None]:
#5
rdd_textfile = sc.wholeTextFiles('/content/el_valor_del_big_data.txt')
rdd_textfile.collect()
rdd_linea = sc.textFile('/content/el_valor_del_big_data.txt')
rdd_linea.collect()

#Ejercicios Seccion 4 - Transformaciones RDD

In [None]:
#Crear un RDD llamado lenguajes que contenga los siguientes lenguajes de programación: Python, R, C, Scala, Rugby y SQL.
lenguajes = sc.parallelize(['R','Python','C','Scala','Rugby','SQL'])
lenguajes.collect()

['R', 'Python', 'C', 'Scala', 'Rugby', 'SQL']

In [None]:
#Obtener un nuevo RDD a partir del RDD lenguajes donde todos los lenguajes de programación estén en mayúsculas.
lengmayusculas = lenguajes.map(lambda x:x.upper())
lengmayusculas.collect()

['R', 'PYTHON', 'C', 'SCALA', 'RUGBY', 'SQL']

In [None]:
#Obtener un nuevo RDD a partir del RDD lenguajes donde todos los lenguajes de programación estén en minúsculas.
lengminusculas = lenguajes.map(lambda x: x.lower())
lengminusculas.collect()

['r', 'python', 'c', 'scala', 'rugby', 'sql']

In [None]:
#Crear un nuevo RDD que solo contenga aquellos lenguajes de programación que comiencen con la letra R.
lengR = lenguajes.filter(lambda x: x.startswith('R'))
lengR.collect()

['R', 'Rugby']

In [None]:
#Crear un RDD llamado pares que contenga los números pares existentes en el intervalo [20;30].
rdd2030 = sc.parallelize(range(20,31))
rdd2030.collect()
pares = rdd2030.filter(lambda x: x%2==0)
pares.collect()

[20, 22, 24, 26, 28, 30]

In [None]:
#Crear el RDD llamado sqrt, este debe contener la raíz cuadrada de los elementos que componen el RDD pares.
import math
sqrt = pares.map(lambda x: math.sqrt(x))
sqrt.collect()

[4.47213595499958,
 4.69041575982343,
 4.898979485566356,
 5.0990195135927845,
 5.291502622129181,
 5.477225575051661]

In [None]:
#Obtener una lista compuesta por los números pares en el intervalo [20;30] y sus respectivas raíces cuadradas.
parraiz=pares.flatMap(lambda x: (x,math.sqrt(x)))
parraiz.collect()

[20,
 4.47213595499958,
 22,
 4.69041575982343,
 24,
 4.898979485566356,
 26,
 5.0990195135927845,
 28,
 5.291502622129181,
 30,
 5.477225575051661]

In [None]:
#Elevar el número de particiones del RDD sqrt a 20.
sqrt20 = sqrt.repartition(20)
sqrt20.getNumPartitions()

20

In [None]:
#Reducir el número de particiones del RDD sqrt20 a 5.
sqrt5 = sqrt20.coalesce(5)
sqrt5.getNumPartitions()

5

In [None]:
#Crear un RDD del tipo clave valor a partir de los datos adjuntos como recurso a esta lección.
#Tener en cuenta que deberá procesar el RDD leído para obtener el resultado solicitado.
#Suponer que el RDD resultante de tipo clave valor refleja las transacciones realizadas por número de cuentas.
#Obtenga el monto total por cada cuenta.
transacciones = sc.textFile('/content/transacciones')
transacciones.collect()

['(1001, 52.3)',
 '(1005, 20.8)',
 '(1001, 10.1)',
 '(1004, 52.7)',
 '(1005, 20.7)',
 '(1002, 85.3)',
 '(1004, 20.9)']

In [None]:
#Sustituimos parentesis por espacios
def proceso(s):
  return tuple(s.replace('(','').replace(')','').split(', ' ))


In [None]:
rddclavevalor= transacciones.map(proceso)
rddclavevalor.collect()

[('1001', '52.3'),
 ('1005', '20.8'),
 ('1001', '10.1'),
 ('1004', '52.7'),
 ('1005', '20.7'),
 ('1002', '85.3'),
 ('1004', '20.9')]

In [None]:
#Transformar la cantidad a float
rddclavevalor2 = rddclavevalor.map(lambda x: (x[0],float(x[1])))
rddclavevalor2.collect()

[('1001', 52.3),
 ('1005', 20.8),
 ('1001', 10.1),
 ('1004', 52.7),
 ('1005', 20.7),
 ('1002', 85.3),
 ('1004', 20.9)]

In [None]:
monto = rddclavevalor2.reduceByKey(lambda x,y: x+y)
monto.collect()

[('1002', 85.3), ('1001', 62.4), ('1005', 41.5), ('1004', 73.6)]

#Ejercicios Seccion 5 - Acciones sobre un RDD

In [None]:
#Crear  un RDD llamado importes a partir del archivo adjunto a esta lección como recurso.
importes = sc.textFile('/content/num.txt')
importes.collect()

['70',
 '69',
 '16',
 '80',
 '85',
 '81',
 '93',
 '5',
 '78',
 '21',
 '13',
 '77',
 '93',
 '31',
 '16',
 '84',
 '64',
 '29',
 '45',
 '24',
 '64',
 '31',
 '2',
 '55',
 '51',
 '92',
 '72',
 '84',
 '22',
 '50',
 '17',
 '5',
 '96',
 '63',
 '67',
 '82',
 '52',
 '38',
 '86',
 '23',
 '90',
 '96',
 '53',
 '95',
 '10',
 '28',
 '75',
 '72',
 '67',
 '82',
 '40',
 '60',
 '34',
 '26',
 '47',
 '55',
 '10',
 '46',
 '86',
 '46',
 '64',
 '37',
 '31',
 '43',
 '74',
 '44',
 '87',
 '7',
 '54',
 '95',
 '43',
 '79',
 '79',
 '53',
 '62',
 '13',
 '37',
 '45',
 '40',
 '81',
 '50',
 '41',
 '30',
 '81',
 '27',
 '27',
 '49',
 '97',
 '73',
 '99',
 '77',
 '64',
 '47',
 '68',
 '8',
 '79',
 '13',
 '49',
 '75',
 '2',
 '23',
 '94',
 '21',
 '2',
 '10',
 '9',
 '19',
 '75',
 '7',
 '27',
 '55',
 '38',
 '10',
 '39',
 '73',
 '21',
 '81',
 '1',
 '61',
 '62',
 '5',
 '91',
 '68',
 '35',
 '81',
 '91',
 '34',
 '24',
 '63',
 '61',
 '32',
 '11',
 '74',
 '43',
 '98',
 '18',
 '25',
 '33',
 '31',
 '32',
 '42',
 '97',
 '11',
 '28',
 '1

In [None]:
#¿Cuántos registros tiene el RDD importes?
importes.count()

233

In [None]:
# ¿Cuál es el valor mínimo y máximo del RDD importes?
importes.max()
#importes.min()

'99'

In [None]:
#Crear un RDD top15 que contenga los 15 mayores valores del RDD importes.
#Tenga en cuenta que pueden repetirse los valores.
#Por último, escriba el RDD top15 como archivo de texto en la carpeta data/salida
top15 = importes.top(15)
rddtop15 = sc.parallelize(top15)
rddtop15.collect()
rdd2 = rddtop15.coalesce(1)
rdd2.saveAsTextFile('/content/salida3')

In [None]:
#Crear una función llamada factorial que calcule el factorial de un número dado como parámetro.
#Utilice RDDs para el cálculo.
def factorial(num):
  if num == 0:
    return 1
  else:
    rdd = sc.parallelize(list(range(1,num+1)))
    return rdd.reduce(lambda x,y: x*y)


In [None]:
factorial(4)

24

#

# Ejercicios Seccion 6 - Aspectos avanzados RDD

2. Si se conoce que a cada venta hay que restarle un importe fijo igual a 10 pesos por temas de impuestos

- ¿Cómo restaría este impuesto de cada venta utilizando una variable broadcast para acelerar el proceso?
- Cree un RDD llamado ventas_sin_impuestos a partir de la propuesta del inciso a que contenga las ventas sin impuestos.
- Destruya la variable broadcast creada luego de emplearla para crear el RDD del inciso b.

3. Persista el RDD ventas_sin_impuestos en los siguientes niveles de persistencia.
- Memoria.
- Disco solamente
- Memoria y disco.



In [None]:
#Crear un RDD importes a partir de los datos adjuntos a esta lección como recurso.
#Emplear acumuladores para obtener el total de ventas realizadas y el importe total de las ventas.
importes = sc.textFile('/content/rdd.txt')
importes.collect()
acumuladorventas = sc.accumulator(0)
acumuladorcantidad=sc.accumulator(0)
importesf = importes.map(lambda x: float(x))
importes.foreach(lambda x: acumuladorventas.add(1))
importesf.foreach(lambda x: acumuladorcantidad.add(x))
print(acumuladorventas.value)
print(acumuladorcantidad.value)

10000
5042335.0


In [None]:
#2.1 2.2
br_impuesto = sc.broadcast(10)
ventassinimp = importesf.map(lambda x: x-br_impuesto.value)
ventassinimp.collect()

In [None]:
#2.3
br_impuesto.destroy()

In [None]:
#3.1
from pyspark.storagelevel import StorageLevel
ventassinimp.persist(StorageLevel.MEMORY_ONLY)

PythonRDD[85] at collect at <ipython-input-54-153e771494e5>:3

In [None]:
#3.2
ventassinimp.unpersist()
ventassinimp.persist(StorageLevel.DISK_ONLY)

PythonRDD[85] at collect at <ipython-input-54-153e771494e5>:3

In [None]:
#3.3
ventassinimp.unpersist()
ventassinimp.persist(StorageLevel.MEMORY_AND_DISK)

PythonRDD[85] at collect at <ipython-input-54-153e771494e5>:3

# Ejercicios Seccion 7 -SparkSQL
Los datos adjuntos a esta lección forman parte de la base de datos [NeurIPS 2020] Data Science for COVID-19 (DS4C) disponible en Kaggle. Estos datos hacen referencia a los casos de contagio de covid-19 en Corea del Sur. El archivo csv Case contiene los casos reportados y el archivo csv PatientInfo contiene la información de los pacientes.
1. A partir del archivo csv Case, determine las tres ciudades con más casos confirmados de la enfermedad. La salida debe contener tres columnas: provincia, ciudad y casos confirmados. El resultado debe contener exactamente los tres nombre de ciudades con más casos confirmados ya que no se admiten otros valores.
2. Cree un dataframe a partir del archivo csv PatientInfo. Asegúrese de que su dataframe no contenga pacientes duplicados.
3. ¿Cuántos pacientes tienen informado por quién se contagiaron(columna infected_by)? Obtenga solo los pacientes que tengan informado por quién se contagiaron.
4. A partir de la salida del inciso anterior obtenga solo los pacientes femeninos. La salida no debe contener las columnas released_date y deceased_date.
5. Establezca el número de particiones del dataframe resultante del inciso anterior en dos. Escriba el dataframe resultante en un archivo parquet. La salida debe estar particionada por la provincia y el modo de escritura debe ser overwrite.



In [None]:
dfcasos = spark.read.option('header','true').option('inferSchema','true').csv('/content/Case.csv')
dfcasos.show()

In [None]:
dfcasos.printSchema()

In [None]:
from pyspark.sql.functions import desc,col

In [None]:
#7.1
dfcasos.filter((col('city')!='-') & (col('city')!='from other city')).sort(desc('confirmed')).select('province','city','confirmed').show()

In [None]:
info = spark.read.option('header','true').option('inferSchema','true').csv('/content/PatientInfo.csv')
info.printSchema()

In [None]:
info.select('patient_id').count()

In [None]:
info.select('patient_id').distinct().count()

In [None]:
infosd = info.dropDuplicates(['patient_id'])
infosd.count()

5164

In [None]:
#7.3
infosd.filter(col('infected_by')!='NULL').count()

1346

In [None]:
dfsinnull = infosd.na.drop(subset=['infected_by'])
dfsinnull.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|   2020-02-19|         NULL|released|
|1000000005|female|20s|  Korea|   Seoul| Seongbuk-gu|contact with patient| 1000000002|             2|              NULL|    2020-01-31|   2020-02-24|         NULL|released|
|1000000006|female|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 1000000003|            43|              NULL|    2020-01-31|

In [None]:
#7.4
finaldf = infosd.filter((col('infected_by')!='NULL') & (col('sex')=='female')).drop('deceased_date','released_date')

In [None]:
finaldf.show(truncate=False)

In [None]:
#7.5
finaldf.coalesce(2).write.partitionBy('province').mode('overwrite').parquet('/content/data')

#Ejercicios Sección 8 - Spark SQL avanzado
Los datos adjuntos a esta lección forman parte de la base de datos Football Data from Transfermarkt de Kaggle. El conjunto de datos se compone de varios archivos CSV con información sobre competiciones, juegos, clubes, jugadores y apariciones.

1. Determine los tres países con mayor número de jugadores(jugadores nacidos en ese país). El resultado debe estar ordenado de forma descendente.

2. Obtenga la lista de jugadores con tarjeta roja. La salida debe contener dos columnas, el nombre de pila del jugador y la cantidad de tarjetas rojas que tiene.

3. ¿Cuántos juegos se jugaron en la Premier League? La salida debe contener dos columnas, el nombre de la liga y la cantidad de juegos que se jugaron en ella.

4. Obtenga las tres ligas con mayor número de asistencia de público teniendo en cuenta todos los juegos que se jugaron en ellas. El resultado debe estar ordenado de forma descendente y tener dos columnas, el nombre de la liga y la asistencia total.

In [None]:
# Crear la sesion de Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName('Curso Pyspark').getOrCreate()

In [None]:
#Generamos los dataframes
appearances = spark.read.option('header','true').option('inferSchema','true').csv('/content/appearances.csv')
clubs = spark.read.option('header','true').option('inferSchema','true').csv('/content/clubs.csv',header=True)
competitions = spark.read.option('header','true').option('inferSchema','true').csv('/content/competitions.csv',header=True)
games = spark.read.option('header','true').option('inferSchema','true').csv('/content/games.csv',header=True)
leagues = spark.read.option('header','true').option('inferSchema','true').csv('/content/leagues.csv',header=True)
players = spark.read.option('header','true').option('inferSchema','true').csv('/content/players.csv',header=True)

In [None]:
players.show()

+---------+---------------+--------------------+--------------------+----------------+----------------------+-------------+----------+------------------+-----+------------+-------------------+---------------------------+--------------------+
|player_id|current_club_id|                name|         pretty_name|country_of_birth|country_of_citizenship|date_of_birth|  position|      sub_position| foot|height_in_cm|market_value_in_gbp|highest_market_value_in_gbp|                 url|
+---------+---------------+--------------------+--------------------+----------------+----------------------+-------------+----------+------------------+-----+------------+-------------------+---------------------------+--------------------+
|    38790|          28095|      dmitri-golubov|      Dmitri Golubov|           UdSSR|                Russia|   1985-06-24|    Attack|    Centre-Forward| Both|         178|               NULL|                   675000.0|https://www.trans...|
|   106539|          28095|  ale

In [None]:
players.printSchema()

root
 |-- player_id: integer (nullable = true)
 |-- current_club_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- pretty_name: string (nullable = true)
 |-- country_of_birth: string (nullable = true)
 |-- country_of_citizenship: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- position: string (nullable = true)
 |-- sub_position: string (nullable = true)
 |-- foot: string (nullable = true)
 |-- height_in_cm: integer (nullable = true)
 |-- market_value_in_gbp: double (nullable = true)
 |-- highest_market_value_in_gbp: double (nullable = true)
 |-- url: string (nullable = true)



In [None]:
#8.1
from pyspark.sql.functions import col, desc
players.groupBy('country_of_birth').count().sort(desc('count')).filter(col('country_of_birth')!='NULL').show()

+-----------------+-----+
| country_of_birth|count|
+-----------------+-----+
|           France| 1694|
|            Spain| 1388|
|            Italy| 1312|
|          England| 1273|
|          Germany| 1154|
|      Netherlands| 1137|
|           Brazil| 1087|
|           Turkey| 1085|
|         Portugal|  803|
|           Greece|  725|
|            UdSSR|  721|
|          Ukraine|  671|
|           Russia|  634|
|         Scotland|  594|
|          Belgium|  580|
|          Denmark|  444|
|        Argentina|  432|
|Jugoslawien (SFR)|  354|
|           Sweden|  186|
|          Nigeria|  182|
+-----------------+-----+
only showing top 20 rows



In [None]:
appearances.printSchema()

root
 |-- player_id: integer (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- appearance_id: string (nullable = true)
 |-- competition_id: string (nullable = true)
 |-- player_club_id: integer (nullable = true)
 |-- goals: integer (nullable = true)
 |-- assists: integer (nullable = true)
 |-- minutes_played: integer (nullable = true)
 |-- yellow_cards: integer (nullable = true)
 |-- red_cards: integer (nullable = true)



In [None]:
appearances.show()

+---------+-------+-------------+--------------+--------------+-----+-------+--------------+------------+---------+
|player_id|game_id|appearance_id|competition_id|player_club_id|goals|assists|minutes_played|yellow_cards|red_cards|
+---------+-------+-------------+--------------+--------------+-----+-------+--------------+------------+---------+
|    52453|2483937|2483937_52453|           RU1|         28095|    0|      0|            90|           0|        0|
|    67064|2479929|2479929_67064|           RU1|         28095|    0|      0|            90|           0|        0|
|    67064|2483937|2483937_67064|           RU1|         28095|    0|      0|            90|           0|        0|
|    67064|2484582|2484582_67064|           RU1|         28095|    0|      0|            55|           0|        0|
|    67064|2485965|2485965_67064|           RU1|         28095|    0|      0|            90|           0|        0|
|    67064|2487345|2487345_67064|           RU1|         28095|    0|   

In [None]:
#8.2
df = appearances.join(players,appearances['player_id']==players['player_id'])
df.filter(col('red_cards')>0).groupBy('pretty_name').count().select('pretty_name','count').sort(desc('count')).show()

+--------------------+-----+
|         pretty_name|count|
+--------------------+-----+
|             Marcelo|    6|
|    Domenico Berardi|    5|
|             Rodrigo|    5|
|        Granit Xhaka|    5|
|          Thomas Lam|    5|
|     Mario Balotelli|    5|
|              Hilton|    5|
|          Joao Pedro|    4|
|          David Luiz|    4|
|      Rafik Halliche|    4|
|   Kalidou Koulibaly|    4|
|          Ante Rebic|    4|
|     Yannick Cahuzac|    4|
|     Damien Da Silva|    4|
|       Ramon Leeuwin|    4|
|Timothee Kolodzie...|    4|
|     Roberto Soriano|    4|
|     Rodrigo De Paul|    4|
|         Ivan Ordets|    4|
|     Stefan Mitrovic|    4|
+--------------------+-----+
only showing top 20 rows



In [None]:
games.printSchema()

root
 |-- game_id: integer (nullable = true)
 |-- competition_code: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- round: string (nullable = true)
 |-- date: date (nullable = true)
 |-- home_club_id: integer (nullable = true)
 |-- away_club_id: integer (nullable = true)
 |-- home_club_goals: integer (nullable = true)
 |-- away_club_goals: integer (nullable = true)
 |-- home_club_position: integer (nullable = true)
 |-- away_club_position: integer (nullable = true)
 |-- stadium: string (nullable = true)
 |-- attendance: integer (nullable = true)
 |-- referee: string (nullable = true)
 |-- url: string (nullable = true)



In [None]:
games.show()

+-------+----------------+------+-------------+----------+------------+------------+---------------+---------------+------------------+------------------+--------------------+----------+--------------------+--------------------+
|game_id|competition_code|season|        round|      date|home_club_id|away_club_id|home_club_goals|away_club_goals|home_club_position|away_club_position|             stadium|attendance|             referee|                 url|
+-------+----------------+------+-------------+----------+------------+------------+---------------+---------------+------------------+------------------+--------------------+----------+--------------------+--------------------+
|2457642|            NLSC|  2014|        Final|2014-08-03|        1269|         610|              1|              0|              NULL|              NULL| Johan Cruijff ArenA|     42000|      Danny Makkelie|https://www.trans...|
|2639088|            BESC|  2013|        Final|2014-07-20|          58|         498|

In [None]:
leagues.printSchema()

root
 |-- league_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- confederation: string (nullable = true)



In [None]:
#8.3
games.filter(col('competition_code')=='GB1').join(leagues,col('competition_code')==col('league_id')).groupBy('name').count().show()


+--------------+-----+
|          name|count|
+--------------+-----+
|premier-league| 2809|
+--------------+-----+



In [None]:
#8.4
games.groupBy('competition_code').sum('attendance').join(leagues,col('competition_code')==col('league_id')).sort(desc('sum(attendance)')).select('name','sum(attendance)').show()

+--------------------+---------------+
|                name|sum(attendance)|
+--------------------+---------------+
|      premier-league|       86964852|
|          bundesliga|       78102473|
|              laliga|       62943533|
|             serie-a|       53475147|
|             ligue-1|       51593963|
|          eredivisie|       34572418|
|        premier-liga|       20878744|
|            liga-nos|       20072843|
|  jupiler-pro-league|       17817099|
|           super-lig|       17455236|
|scottish-premiership|       17379753|
|         superligaen|        7945555|
|      super-league-1|        6417136|
|        premier-liga|        4944837|
+--------------------+---------------+



#Ejercicios Sección 9 - Funciones en Spark SQL
Adjunto como recurso a esta lección están los archivos movies.csv y movie_ratings.csv. En ambos archivos las columnas están delimitadas por un pip(“|”).

Cada línea del archivo movies.csv representa a un actor que actuó en una película. Si una película tiene diez actores, habrá diez filas para esa película en particular.

1. Calcule la cantidad de películas en las que participó cada actor. La salida debe tener dos columnas: actor y conteo. La salida debe ordenarse por el conteo en orden descendente.

2. Calcule la cantidad de películas producidas cada año. La salida debe tener tres columnas: año, siglo al que pertenece el año y conteo. La salida debe ordenarse por el conteo en orden descendente.

3. Obtenga la película con la calificación más alta por año. La salida debe tener solo una película por año y debe contener tres columnas: año, título de la película y valoración.

In [None]:
movies = spark.read.option('sep','|').option('header','true').option('inferSchema','true').csv('/content/movies.csv')
movies.show()
movies.printSchema()

+-----------------+--------------------+----+
|            actor|            pelicula| año|
+-----------------+--------------------+----+
|McClure, Marc (I)|       Freaky Friday|2003|
|McClure, Marc (I)|        Coach Carter|2005|
|McClure, Marc (I)|         Superman II|1980|
|McClure, Marc (I)|           Apollo 13|1995|
|McClure, Marc (I)|            Superman|1978|
|McClure, Marc (I)|  Back to the Future|1985|
|McClure, Marc (I)|Back to the Futur...|1990|
|Cooper, Chris (I)|  Me, Myself & Irene|2000|
|Cooper, Chris (I)|         October Sky|1999|
|Cooper, Chris (I)|              Capote|2005|
|Cooper, Chris (I)|The Bourne Supremacy|2004|
|Cooper, Chris (I)|         The Patriot|2000|
|Cooper, Chris (I)|            The Town|2010|
|Cooper, Chris (I)|          Seabiscuit|2003|
|Cooper, Chris (I)|      A Time to Kill|1996|
|Cooper, Chris (I)|Where the Wild Th...|2009|
|Cooper, Chris (I)|         The Muppets|2011|
|Cooper, Chris (I)|     American Beauty|1999|
|Cooper, Chris (I)|             Sy

In [None]:
#9.1
from pyspark.sql.functions import col,desc
movies.groupBy(col('actor')).count().select(col('actor'),col('count')).sort(desc('count')).show()

+-------------------+-----+
|              actor|count|
+-------------------+-----+
|   Tatasciore, Fred|   38|
|      Welker, Frank|   38|
| Jackson, Samuel L.|   32|
|      Harnell, Jess|   31|
|        Damon, Matt|   27|
|      Willis, Bruce|   27|
|  Cummings, Jim (I)|   26|
|         Hanks, Tom|   25|
|   Lynn, Sherry (I)|   25|
|    McGowan, Mickie|   25|
|    Bergen, Bob (I)|   25|
|      Proctor, Phil|   24|
|   Wilson, Owen (I)|   23|
|        Cruise, Tom|   23|
|         Pitt, Brad|   23|
|Freeman, Morgan (I)|   22|
|Williams, Robin (I)|   22|
|       Depp, Johnny|   22|
|     Morrison, Rana|   22|
|      Diaz, Cameron|   21|
+-------------------+-----+
only showing top 20 rows



In [None]:
movra = spark.read.option('sep','|').option('header','true').option('inferSchema','true').csv('/content/movie_ratings.csv')
movra.show(truncate=False)
movra.printSchema()

+----------+--------------------------+----+
|valoracion|pelicula                  |año |
+----------+--------------------------+----+
|1.6339    |'Crocodile' Dundee II     |1988|
|7.6177    |10                        |1979|
|1.2864    |10 Things I Hate About You|1999|
|0.3243    |10,000 BC                 |2008|
|0.3376    |101 Dalmatians            |1996|
|0.5218    |102 Dalmatians            |2000|
|12.8205   |1066                      |2012|
|0.6829    |12                        |2007|
|7.4061    |12 Rounds                 |2009|
|2.3677    |127 Hours                 |2010|
|1.3585    |13 Going on 30            |2004|
|8.4034    |13 game sayawng           |2006|
|0.59      |1408                      |2007|
|4.4292    |15 Minutes                |2001|
|2.2118    |16 Blocks                 |2006|
|1.0491    |17 Again                  |2009|
|3.9265    |1941                      |1979|
|10.4757   |2 Days in the Valley      |1996|
|0.4       |2 Fast 2 Furious          |2003|
|11.1111  

In [None]:
#9.2
from pyspark.sql.functions import when,lit,countDistinct
movies.groupBy('año').agg(
    countDistinct('pelicula').alias('conteo')
).withColumn('siglo', when((col('año')>=1900) & (col('año')<2000), lit('XX')).otherwise(lit('XXI'))).orderBy(desc('conteo')).show()

+----+------+-----+
| año|conteo|siglo|
+----+------+-----+
|2006|    86|  XXI|
|2004|    86|  XXI|
|2011|    86|  XXI|
|2005|    85|  XXI|
|2008|    82|  XXI|
|2002|    81|  XXI|
|2010|    78|  XXI|
|2000|    77|  XXI|
|2003|    76|  XXI|
|2007|    75|  XXI|
|2001|    71|  XXI|
|2009|    68|  XXI|
|1999|    67|   XX|
|1997|    66|   XX|
|1998|    59|   XX|
|1996|    42|   XX|
|2012|    32|  XXI|
|1995|    25|   XX|
|1994|    16|   XX|
|1986|    16|   XX|
+----+------+-----+
only showing top 20 rows



In [None]:
movra.show()


+----------+--------------------+----+
|valoracion|            pelicula| año|
+----------+--------------------+----+
|    1.6339|'Crocodile' Dunde...|1988|
|    7.6177|                  10|1979|
|    1.2864|10 Things I Hate ...|1999|
|    0.3243|           10,000 BC|2008|
|    0.3376|      101 Dalmatians|1996|
|    0.5218|      102 Dalmatians|2000|
|   12.8205|                1066|2012|
|    0.6829|                  12|2007|
|    7.4061|           12 Rounds|2009|
|    2.3677|           127 Hours|2010|
|    1.3585|      13 Going on 30|2004|
|    8.4034|     13 game sayawng|2006|
|      0.59|                1408|2007|
|    4.4292|          15 Minutes|2001|
|    2.2118|           16 Blocks|2006|
|    1.0491|            17 Again|2009|
|    3.9265|                1941|1979|
|   10.4757|2 Days in the Valley|1996|
|       0.4|    2 Fast 2 Furious|2003|
|   11.1111|              2 Guns|2013|
+----------+--------------------+----+
only showing top 20 rows



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,max
windowSpec = Window.partitionBy('año').orderBy(desc('valoracion'))
windowSpecAgg = Window.partitionBy('año')

In [None]:
#9.3
movra.withColumn('row_number', row_number().over(windowSpec)).withColumn('max_val',max('valoracion').over(windowSpecAgg)).filter(col('row_number')==1
  ).select('pelicula','año','max_val').show()

+--------------------+----+-------+
|            pelicula| año|max_val|
+--------------------+----+-------+
|Snow White and th...|1937| 2.2207|
|    The Wizard of Oz|1939| 7.9215|
|           Pinocchio|1940| 7.8557|
|               Bambi|1942| 1.5053|
|   Song of the South|1946|  7.602|
|          Cinderella|1950| 9.4226|
|           Peter Pan|1953| 5.4756|
|         Rear Window|1954|10.7625|
|  Lady and the Tramp|1955| 5.1258|
|Around the World ...|1956|14.0607|
|     Sleeping Beauty|1959| 6.3919|
|              Psycho|1960|10.6375|
|One Hundred and O...|1961| 0.6726|
|     The Longest Day|1962|12.8866|
|It's a Mad Mad Ma...|1963|  6.626|
|        My Fair Lady|1964|  7.587|
|      Doctor Zhivago|1965| 4.9304|
|Who's Afraid of V...|1966|11.1111|
|     The Dirty Dozen|1967| 13.388|
|        The Love Bug|1968|13.4383|
+--------------------+----+-------+
only showing top 20 rows

