# Examen ETL: SPARK 02/02

Se podrá utilizar toda la información que se encuentra en el campus. 

El fichero de datos sobre el que se trabajará es el de partidosLigaNBA.txt.

A cada una de las preguntas hay que responder explicando brevemente que se pretende hacer antes de lanzar el código.

Al documento lo llamareís con vuestro nombre y apellido. Debeís enviarlo a mi correo de CUNEF antes del final del examen.

El lenguaje para trabajar con Spark podrá ser python o R indistintamente.

## Primera pregunta: Describe brevemente que diferencia el persists, cache y collect en spark. Explica brevemente casos en los que es interesante su aplicación

Persist <- te permite almacenar los datos tanto en la memoria
Cache <- te permite guardar los datos en el caché
Y collect es una acción de spark que lo que hace es sacar los resultados, equivale a take

## Segunda pregunta: Explica brevemente los pasos a seguir para realizar la carga de un conjunto de datos (pasos que se siguieron en la práctica con datos de logs)

Primero lo que tenemos que realizar es la carga de las librerías pertinentes y necesarias para hacer la prácica, luego los 
contextos necesarios que necesitemos. Más tarde leemos los datos con la ruta, los introducimos en nuestro contexto con sc().

## Tercera Pregunta: Índica un tipo de problema que puede empeorar los datos. (pe. Que no exista un representante del CDO en todas las áreas de negocio), pon algún ejemplo específico (pe. Datos duplicados) y cómo lo tratarías con técnicas de data cleaning.

Por ejemplo que existan fechas y no estén en tipo fecha, que haya datos faltantes, o que haya filas o columnas que no contengan información
Y lo único que nos hagan sea distorsionarnos el análisis, que haya variables string que deberían de ser numéricas, que las variables no estén escaladas para la realización de modelos...

## Cuarta tarea: Inicializar spark context y cargar los datos desde el fichero.

In [5]:
import datetime as dt
import re
from datetime import *
import os
import pandas as pd
from time import time

from pyspark import SparkContext
from pyspark.sql import *

In [2]:
sc = SparkContext()

In [6]:
sqlCont = SQLContext(sc)

In [8]:
datos = './partidosLigaNBA.csv'
raw_data = sc.textFile(datos)

In [9]:
raw_data.take(5)

['Date:Start..ET.:Visitor.Neutral:PTS:Home.Neutral:PTS.1',
 'Tue, Oct 30, 2007:"7:30 pm":Utah Jazz:117:Golden State Warriors:96',
 'Tue, Oct 30, 2007:"7:30 pm":Houston Rockets:95:Los Angeles Lakers:93',
 'Tue, Oct 30, 2007:"7:00 pm":Portland Trail Blazers:97:San Antonio Spurs:106',
 'Wed, Oct 31, 2007:"8:00 pm":Dallas Mavericks:92:Cleveland Cavaliers:74']

In [10]:
data_clean = raw_data.map(lambda x:x.split(':'))
data_clean.take(5)

[['Date', 'Start..ET.', 'Visitor.Neutral', 'PTS', 'Home.Neutral', 'PTS.1'],
 ['Tue, Oct 30, 2007',
  '"7',
  '30 pm"',
  'Utah Jazz',
  '117',
  'Golden State Warriors',
  '96'],
 ['Tue, Oct 30, 2007',
  '"7',
  '30 pm"',
  'Houston Rockets',
  '95',
  'Los Angeles Lakers',
  '93'],
 ['Tue, Oct 30, 2007',
  '"7',
  '00 pm"',
  'Portland Trail Blazers',
  '97',
  'San Antonio Spurs',
  '106'],
 ['Wed, Oct 31, 2007',
  '"8',
  '00 pm"',
  'Dallas Mavericks',
  '92',
  'Cleveland Cavaliers',
  '74']]

In [11]:
header = data_clean.first()

In [12]:
data_clean = raw_data.map(lambda x:x.split(':')).filter(lambda x:'Playoffs' not in x[0]).filter(lambda x: x != header)
data_clean.take(5)

[['Tue, Oct 30, 2007',
  '"7',
  '30 pm"',
  'Utah Jazz',
  '117',
  'Golden State Warriors',
  '96'],
 ['Tue, Oct 30, 2007',
  '"7',
  '30 pm"',
  'Houston Rockets',
  '95',
  'Los Angeles Lakers',
  '93'],
 ['Tue, Oct 30, 2007',
  '"7',
  '00 pm"',
  'Portland Trail Blazers',
  '97',
  'San Antonio Spurs',
  '106'],
 ['Wed, Oct 31, 2007',
  '"8',
  '00 pm"',
  'Dallas Mavericks',
  '92',
  'Cleveland Cavaliers',
  '74'],
 ['Wed, Oct 31, 2007',
  '"8',
  '30 pm"',
  'Seattle SuperSonics',
  '103',
  'Denver Nuggets',
  '120']]

## Quinta tarea: Media de la diferencia de puntos por año

In [54]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime as dt

In [85]:
data_frame = data_clean.map(lambda x:Row(fecha = dt.datetime.strptime((x[0]),'%a, %b %d, %Y').strftime('%m/%d/%Y'),
                                         hora = (x[1] +' : '+ x[2]),
                                         equipo_residente = x[5],
                                         equipo_visitante = x[3],
                                         puntos_visitante=int(x[4]),
                                         puntos_local=int(x[6])))



In [102]:
df = sqlCont.createDataFrame(data_frame)
df.registerTempTable('interactions')

In [103]:
df.show(5)

+--------------------+--------------------+----------+-----------+------------+----------------+
|    equipo_residente|    equipo_visitante|     fecha|       hora|puntos_local|puntos_visitante|
+--------------------+--------------------+----------+-----------+------------+----------------+
|Golden State Warr...|           Utah Jazz|10/30/2007|"7 : 30 pm"|          96|             117|
|  Los Angeles Lakers|     Houston Rockets|10/30/2007|"7 : 30 pm"|          93|              95|
|   San Antonio Spurs|Portland Trail Bl...|10/30/2007|"7 : 00 pm"|         106|              97|
| Cleveland Cavaliers|    Dallas Mavericks|10/31/2007|"8 : 00 pm"|          74|              92|
|      Denver Nuggets| Seattle SuperSonics|10/31/2007|"8 : 30 pm"|         120|             103|
+--------------------+--------------------+----------+-----------+------------+----------------+
only showing top 5 rows



In [104]:
df = df.withColumn('abs', abs(df.puntos_local - df.puntos_visitante)).show(2)


+--------------------+----------------+----------+-----------+------------+----------------+---+
|    equipo_residente|equipo_visitante|     fecha|       hora|puntos_local|puntos_visitante|abs|
+--------------------+----------------+----------+-----------+------------+----------------+---+
|Golden State Warr...|       Utah Jazz|10/30/2007|"7 : 30 pm"|          96|             117| 21|
|  Los Angeles Lakers| Houston Rockets|10/30/2007|"7 : 30 pm"|          93|              95|  2|
+--------------------+----------------+----------+-----------+------------+----------------+---+
only showing top 2 rows



Por RDD

In [13]:
media_rdd = data_clean.map(lambda x: (x[0][len(x)-11:18], __builtin__.abs(int(x[4]) - int(x[6]))))

miTupla = (0, 0)
media_rdd.aggregateByKey(miTupla, lambda a,b: (a[0] + b,    a[1] + 1),
                                 lambda a,b: (a[0] + b[0], a[1] + b[1]))\
                                .mapValues(lambda x: x[0]/x[1])\
                                .sortBy(lambda x: x[0], False)\
                                .collect()

[('2017', 11.422166874221668),
 ('2016', 11.550637659414853),
 ('2015', 11.159969673995452),
 ('2014', 10.9047976011994),
 ('2013', 11.071752265861027),
 ('2012', 10.845318860244234),
 ('2011', 10.661016949152541),
 ('2010', 10.86903860711582),
 ('2009', 11.090425531914894),
 ('2008', 11.543543543543544),
 ('2007', 11.096491228070175)]

## Sexta tarea: ¿Han judado todos los equipos el mismo número de partidos? ¿ Si es qué no a que puede deberse?

In [16]:
from operator import add

In [19]:
equipos_visiantes = data_clean.map(lambda y: (y[3], 1))
equipos_locales = data_clean.map(lambda y: (y[5], 1))

agrupados_locales = equipos_locales.reduceByKey(add).sortBy(lambda x: x[1], False).collect()
agrupados_visitantes = equipos_visiantes.reduceByKey(add).sortBy(lambda x: x[1], False).collect()

In [18]:
agrupados_locales

[('San Antonio Spurs', 467),
 ('Boston Celtics', 467),
 ('Miami Heat', 461),
 ('Los Angeles Lakers', 450),
 ('Cleveland Cavaliers', 449),
 ('Atlanta Hawks', 448),
 ('Golden State Warriors', 445),
 ('Chicago Bulls', 436),
 ('Houston Rockets', 434),
 ('Indiana Pacers', 434),
 ('Memphis Grizzlies', 433),
 ('Dallas Mavericks', 431),
 ('Orlando Magic', 431),
 ('Los Angeles Clippers', 431),
 ('Toronto Raptors', 426),
 ('Portland Trail Blazers', 425),
 ('Denver Nuggets', 424),
 ('Utah Jazz', 422),
 ('Washington Wizards', 421),
 ('Philadelphia 76ers', 416),
 ('Detroit Pistons', 415),
 ('Milwaukee Bucks', 413),
 ('Phoenix Suns', 412),
 ('New York Knicks', 412),
 ('Oklahoma City Thunder', 410),
 ('Sacramento Kings', 402),
 ('Minnesota Timberwolves', 402),
 ('Charlotte Bobcats', 283),
 ('New Orleans Hornets', 250),
 ('Brooklyn Nets', 217),
 ('New Jersey Nets', 197),
 ('New Orleans Pelicans', 166),
 ('Charlotte Hornets', 126),
 ('Seattle SuperSonics', 41)]

In [20]:
agrupados_visitantes

[('San Antonio Spurs', 466),
 ('Boston Celtics', 463),
 ('Miami Heat', 456),
 ('Cleveland Cavaliers', 452),
 ('Atlanta Hawks', 448),
 ('Los Angeles Lakers', 447),
 ('Golden State Warriors', 440),
 ('Chicago Bulls', 437),
 ('Dallas Mavericks', 436),
 ('Indiana Pacers', 434),
 ('Houston Rockets', 434),
 ('Memphis Grizzlies', 434),
 ('Orlando Magic', 432),
 ('Los Angeles Clippers', 430),
 ('Portland Trail Blazers', 428),
 ('Utah Jazz', 424),
 ('Toronto Raptors', 424),
 ('Denver Nuggets', 424),
 ('Washington Wizards', 423),
 ('Philadelphia 76ers', 418),
 ('Milwaukee Bucks', 414),
 ('Detroit Pistons', 414),
 ('Phoenix Suns', 413),
 ('New York Knicks', 413),
 ('Oklahoma City Thunder', 408),
 ('Sacramento Kings', 402),
 ('Minnesota Timberwolves', 402),
 ('Charlotte Bobcats', 283),
 ('New Orleans Hornets', 249),
 ('Brooklyn Nets', 218),
 ('New Jersey Nets', 197),
 ('New Orleans Pelicans', 166),
 ('Charlotte Hornets', 127),
 ('Seattle SuperSonics', 41)]

Como puede verse no han jugado los mismos numeros de partidos de local y visitante

## Séptima pregunta: ¿Cuantos partidos ha ganado en Enero Clevelant?

In [24]:
cleve_vis = data_clean.filter(lambda x:x[3] == 'Cleveland Cavaliers').filter(lambda x:'Jan' in x[0]).filter(lambda x: int(x[4])>int(x[6]))
cleve_vis.count()

41

In [28]:
cleve_local = data_clean.filter(lambda x:x[5] == 'Cleveland Cavaliers').filter(lambda x:'Jan' in x[0]).filter(lambda x: int(x[4])<int(x[6]))
cleve_local.count()

42

In [29]:
# Sumar para tener el ejercicio completo, como podemos observar tenemos 41 + 42

cleve_local.count() + cleve_vis.count()

83

## Octava pregunta: ¿Los Warrios son mejores fuera de casa o en casa?

In [206]:
victorias_warrior_local = data_clean.filter(lambda x:x[3] == 'Golden State Warriors').filter(lambda x: int(x[4])>int(x[6])).map(lambda x: (x[3],1)).reduceByKey(add).sortByKey().take(1)

In [207]:
victorias_Warrior_visitante = data_clean.filter(lambda x:x[5] == 'Golden State Warriors').filter(lambda x:int(x[6])>int(x[4])).map(lambda x : (x[5],1)).reduceByKey(add).sortByKey().take(1)

In [208]:
victorias_warrior_local + victorias_Warrior_visitante

[('Golden State Warriors', 215), ('Golden State Warriors', 308)]

Como podemos observar, las victorias de Warriors fuera de casa son mayores que en casa, 308 > 215

## Novena pregunta: Equipo que ha quedado primerio en victorias más temporadas. (si es que hay alguno que más)

##  Décima pregunta: Escribe la expresión regular correcta que sólo macheen los teléfonos y el correo del siguiente texto.

Si eres cliente y necesitas información sobre tus posiciones, productos o realizar operaciones: Desde España. Desde el extranjero. Banca telefónica en castellano. Bandera castellano. 902 13 23 13. Banca telefónica en catalán. Bandera catalana. 902 88 30 08. Banca telefónica en inglés. Bandera inglesa. 902 88 88 35. O por correo electrónico a atencioncliente@bankinter.com