# Tarea PySpark

### Objetivo:
 Analizar la eficiencia de los jugadores en términos generales y por posición, así como determinar la contribución al equipo por jugador tomando en cuenta los datos obtenidos

Usarás la base de datos del archivo 'fusbol.csv' para obtener tus datos. Checa la estructura del archivo para ver si es necesario limpiar la informacion, ver su estructura y así sea más fácil completar la tarea. Besos.

### Ejercicio 1:
Carga la base de datos en un DataFrame de Pyspark (con 2 nucleos). Valida los rangos de los valores donde sea aplicable, así como su corrección (en caso de ser necesaria). 
Después, utilizando las variables más relevantes como 'Ast/90', 'PassCmp%', etc., concluye qué ligas tienen los mejores jugadores por posición. Es decir, si los mejores jugadores defensas son de la liga francesa, inglesa, etc., por ejemplo.
Como cada persona tiene una definición de "mejor", utiliza las siguientes metricas por posición:
- Delanteros: npG+A/90 y npxG+xA/90
- Medios: KeyPass/90 y PassCmp%
- Defensas: PressSucc% y Interceptions/90

Con los resultados obtenidos, grafica por posición para que tu conclusión tenga un respaldo visual también.

In [None]:
import findspark
findspark.init()
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [None]:
spark = SparkSession.builder.appName('firstSession')\
    .config('spark.master', 'local[2]')\
    .config('spark.executor.memory', '1g')\
    .config("spark.sql.shuffle.partitions", 10)\
    .config('spark.driver.memory','1g')\
    .getOrCreate()

In [None]:
spark

In [None]:
df_c = spark.read.csv('fusbol.csv', header=True, inferSchema=True)
df_c.show()

In [None]:
df_c.printSchema()

In [None]:
df_c.count()

In [None]:
df_c= df_c.dropna()

In [None]:
df_c.count()

### Delanteros 

In [None]:
#Filtro para solo tener delanteros

delanteros_df = df_c.filter(col("Pos").like("%FW%"))
#Confirmo la respuesta
delanteros_df.select("Pos").show()

# Agrupar por la columna 'Comp' y promediar los valores de 'npG+A/90'
avg_npG = delanteros_df.groupBy("Comp").agg(avg("npG+A/90").alias("npG+A/90")).orderBy(desc("npG+A/90"))
# Mostrar el resultado
avg_npG.show()
#Agrupar por la columna 'Comp' y promediar los valores de 'npG+A/90'
avg_npxG = delanteros_df.groupBy("Comp").agg(avg("npxG+xA/90").alias("npxG+xA/90")).orderBy(desc("npxG+xA/90"))
avg_npxG.show()

### Graficas

In [None]:
# Convierte los DataFrames de Spark a Pandas DataFrames
avg_npG_pd = avg_npG.toPandas()
avg_npxG_pd = avg_npxG.toPandas()

In [None]:
#Grafica de avg_npG_pd
plt.figure(figsize=(12, 7))
sns.barplot(x='Comp', y='npG+A/90', data=avg_npG_pd, palette='viridis')
plt.title('Promedio de Goles y Asistencias No Penales por 90 Minutos por Competencia')
plt.xlabel('Competencia')
plt.ylabel('Goles y Asistencias No Penales por 90 Minutos')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
#Grafica de avg_npxG_pd
plt.figure(figsize=(12, 7))
sns.barplot(x='Comp', y='npxG+xA/90', data=avg_npxG_pd, palette='viridis')
plt.title('Promedio de Goles y Asistencias Esperadas No Penales por 90 Minutos por Competencia')
plt.xlabel('Competencia')
plt.ylabel('Goles y Asistencias Esperadas No Penales por 90 Minutos')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


### Medios

In [None]:
#Filtro para solo tener medios

medios_df = df_c.filter(col("Pos").like("%MF%"))
#Confirmo la respuesta
medios_df.select("Pos").show()

# Agrupar por la columna 'Comp' y promediar los valores de 'KeyPass/90'
keyPass = medios_df.groupBy("Comp").agg(avg("KeyPass/90").alias("KeyPass/90")).orderBy(desc("KeyPass/90"))
# Mostrar el resultado
keyPass.show()
#Agrupar por la columna 'Comp' y promediar los valores de 'PassCmp%'
passCmp = medios_df.groupBy("Comp").agg(avg("PassCmp%").alias("PassCmp%")).orderBy(desc("PassCmp%"))
passCmp.show()

### Graficas

In [None]:
# Convertir a Pandas DataFrames
keyPass_pd = keyPass.toPandas()
passCmp_pd = passCmp.toPandas()

In [None]:
# Gráfico para Key Passes por 90 minutos
plt.figure(figsize=(12, 7))
sns.barplot(x='Comp', y='KeyPass/90', data=keyPass_pd, palette='magma')
plt.title('Promedio de Pases Clave por 90 Minutos por Competencia')
plt.xlabel('Competencia')
plt.ylabel('Pases Clave por 90 Minutos')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Gráfico para Porcentaje de Pases Completados
plt.figure(figsize=(12, 7))
sns.barplot(x='Comp', y='PassCmp%', data=passCmp_pd, palette='magma')
plt.title('Promedio de Porcentaje de Pases Completados por Competencia')
plt.xlabel('Competencia')
plt.ylabel('Porcentaje de Pases Completados')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

### Defensa 

In [None]:
#Filtro para solo tener defensas

defensas_df = df_c.filter(col("Pos").like("%DF%"))
#Confirmo la respuesta
defensas_df.select("Pos").show()

# Agrupar por la columna 'Comp' y promediar los valores de 'PressSucc%"'
pressSucc = defensas_df.groupBy("Comp").agg(avg("PressSucc%").alias("PressSucc%")).orderBy(desc("PressSucc%"))
# Mostrar el resultado
pressSucc.show()
#Agrupar por la columna 'Comp' y promediar los valores de 'Interceptions/90'
interceptions = defensas_df.groupBy("Comp").agg(avg("Interceptions/90").alias("Interceptions/90")).orderBy(desc("Interceptions/90"))
interceptions.show()

### Graficas 

In [None]:
# Convierte los DataFrames de Spark a Pandas DataFrames
interceptions_pd = interceptions.toPandas()
pressSucc_pd = pressSucc.toPandas()

In [None]:
#Grafica de press_Succ
plt.figure(figsize=(10, 6))
sns.barplot(x='Comp', y='PressSucc%', data=pressSucc_pd)
plt.title('Porcentaje de Éxito en Presión por Competencia')
plt.xlabel('Competencia')
plt.ylabel('Porcentaje de Éxito en Presión')
plt.xticks(rotation=45)
plt.show()

In [None]:
#Grafica de Interceptions
plt.figure(figsize=(10, 6))
sns.barplot(x='Comp', y='Interceptions/90', data=interceptions_pd)
plt.title('Promedio de Intercepciones por 90 minutos por Competencia')
plt.xlabel('Competencia')
plt.ylabel('Intercepciones por 90 minutos')
plt.xticks(rotation=45)
plt.show()

### Nota importante 
Considerar el promedio para entender cual es la liga con mejores jugadores puede ser un error estadistico. En este caso estamos considerando la liga que en promedio tiene a los mejores jugadores. Esto no significa que todos los mejores jugadores pertenescan a esta liga. 


### Ejercicio 2:

Ahora hagamos algo un poco más interesante. Escoge algún jugador de todos los disponibles y toma 5 metricas, las que quieras. Debes concluir en qué percentil se encuentra el jugador en esas métricas que escogiste. Obviamente, vas a comparar sus valores con todos los demás con los que comparte posición y liga, para no tener un sesgo y que la información no pierda robustez. Por último, genera un DataFrame de Pyspark con todos los datos solicitados. Muestra el DataFrame y conviertelo a otro de tipo pandas. Muestra los dos.

In [None]:
# Filtrar para encontrar jugadores cuyo nombre incluya 'Timo' (en cualquier combinación de mayúsculas y minúsculas)
#Intento buscar al GOAT Timo Werner 
players_with_timo = df_c.filter(lower(col("Player")).like("%timo%"))
players_with_timo.show()

In [None]:
#Filtro para solo tener jugadores de su pos y su liga
fw_eng_prem_df = df_c.filter((col("Pos").like("%FW%")) & (col("Comp") == "eng Premier League"))
fw_eng_prem_df.show()

### Nota 
Aqui estamos viendo el Percentil del jugador, el promedio de la liga, el maximo, el minimo, la desviacion en esa estadistica. Esto nos ayuda a comprobar los resultados más adelante

In [None]:

metrics = ["npG+A/90","npxG+xA/90", "Shots/90", "SoT%", "AvgShotDist", "PassAtt/90"]

# Iterar sobre cada métrica para realizar análisis
for metric in metrics:
    # Definir la ventana sobre la que se calculará el percentil
    windowSpec = Window.orderBy(col(metric).desc())

    # Calcular el percentil para cada jugador
    fw_eng_prem_df = fw_eng_prem_df.withColumn(f"Percentile_{metric}", percent_rank().over(windowSpec))

    # Buscar un jugador específico y ver su percentil
    specific_player = fw_eng_prem_df.filter(col("Player").like("%Timo Werner%"))
    specific_player.select("Player", metric, f"Percentile_{metric}").show()

    # Calcular y mostrar el promedio
    average_metric = fw_eng_prem_df.agg(avg(metric).alias(f"Average_{metric}"))
    average_metric.show()

    # Calcular y mostrar estadísticas básicas
    stats = fw_eng_prem_df.agg(
        max(metric).alias("Max"),
        min(metric).alias("Min"),
        stddev(metric).alias("Standard Deviation")
    )
    stats.show()


### Nota
Cuando usas el método append() en la lista results en el contexto del código de PySpark que has proporcionado, estás agregando un DataFrame a la lista. Cada elemento de la lista results es un DataFrame de Spark que contiene los resultados específicos para "Timo Werner" de una métrica particular.

In [None]:
# Asumiendo que fw_eng_prem_df ya está cargado y definido
metrics = ["npG+A/90", "npxG+xA/90", "Shots/90", "SoT%", "AvgShotDist", "PassAtt/90"]

# Lista para acumular los resultados de cada métrica
results = []

# Iterar sobre cada métrica para realizar análisis
for metric in metrics:
    windowSpec = Window.orderBy(col(metric).desc())
    fw_eng_prem_df = fw_eng_prem_df.withColumn(f"Percentile_{metric}", percent_rank().over(windowSpec))

    # Filtrar por un jugador específico y seleccionar los datos necesarios
    specific_player = fw_eng_prem_df.filter(col("Player").like("%Timo Werner%"))
    specific_player_data = specific_player.select("Player", lit(metric).alias("Metric"), col(f"Percentile_{metric}").alias("Percentile"))

    # Agregar los resultados al listado
    results.append(specific_player_data)

# Concatenar todos los DataFrames en la lista en un único DataFrame
final_result_df = results[0]
for df in results[1:]:
    final_result_df = final_result_df.union(df)

# Mostrar el DataFrame final con todos los percentiles
final_result_df.show()
final_result_pd = final_result_df.toPandas()
print(final_result_pd)