[Dataset Kaggle](https://www.kaggle.com/martinellis/nhl-game-data)

[Medium](https://towardsdatascience.com/a-brief-introduction-to-pyspark-ff4284701873)

[Documentation](https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#pyspark.sql.SparkSession)

In [10]:
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession, SQLContext
# create sparksession
spark = SparkSession \
    .builder \
    .appName("prueba_sql") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Se lee el archivo en el dataframe de Spark y luego se pasa a *parquet*  ya que es un formato que incluye metadata del tipo de datos de las columnas, ofrece compresión de archivo y es un formato de archivo diseñado para trabajar bien con Spark

In [13]:
file_location = '/home/jovyan/work/Spark/spark-warehouse/game_skater_stats.csv'
df = spark.read.format("csv").option("inferSchema", 
           True).option("header", True).load(file_location)
display(df)

DataFrame[game_id: int, player_id: int, team_id: int, timeOnIce: int, assists: int, goals: int, shots: int, hits: int, powerPlayGoals: int, powerPlayAssists: int, penaltyMinutes: int, faceOffWins: int, faceoffTaken: int, takeaways: int, giveaways: int, shortHandedGoals: int, shortHandedAssists: int, blocked: int, plusMinus: int, evenTimeOnIce: int, shortHandedTimeOnIce: int, powerPlayTimeOnIce: int]

In [14]:
df.write.save('/home/jovyan/work/Spark/spark-warehouse/game_skater_stats',  
               format='parquet')
df = spark.read.load("/home/jovyan/work/Spark/spark-warehouse/game_skater_stats")
display(df)

DataFrame[game_id: int, player_id: int, team_id: int, timeOnIce: int, assists: int, goals: int, shots: int, hits: int, powerPlayGoals: int, powerPlayAssists: int, penaltyMinutes: int, faceOffWins: int, faceoffTaken: int, takeaways: int, giveaways: int, shortHandedGoals: int, shortHandedAssists: int, blocked: int, plusMinus: int, evenTimeOnIce: int, shortHandedTimeOnIce: int, powerPlayTimeOnIce: int]

In [22]:
names = spark.read.format("CSV").option("inferSchema", True).option("header", True).load('/home/jovyan/work/Spark/spark-warehouse/player_info.csv')

### Consultas
Primero se crea una tabla temporala la cual se le hacen las conultas

In [18]:
df.createOrReplaceTempView("stats")

(spark.sql("""
  select player_id, sum(1) as games, sum(goals) as goals
  from stats
  group by 1
  order by 3 desc
  limit 5
""")).show()

+---------+-----+-----+
|player_id|games|goals|
+---------+-----+-----+
|  8471214|  788|  434|
|  8474564|  655|  342|
|  8474141|  748|  311|
|  8475166|  700|  308|
|  8470794|  782|  305|
+---------+-----+-----+



In [40]:
df.columns

['game_id',
 'player_id',
 'team_id',
 'timeOnIce',
 'assists',
 'goals',
 'shots',
 'hits',
 'powerPlayGoals',
 'powerPlayAssists',
 'penaltyMinutes',
 'faceOffWins',
 'faceoffTaken',
 'takeaways',
 'giveaways',
 'shortHandedGoals',
 'shortHandedAssists',
 'blocked',
 'plusMinus',
 'evenTimeOnIce',
 'shortHandedTimeOnIce',
 'powerPlayTimeOnIce']

A las consultas se les puede asignar un *dataframe* y guardar una nueva tabla temporal

In [33]:
#Jugadores con más goles y partidos
top_players = spark.sql("""
  select player_id, sum(1) as games,
  sum(goals) as goals
  from stats
  group by 1
  order by 3 desc
  limit 5
""").show()

+---------+-----+-----+
|player_id|games|goals|
+---------+-----+-----+
|  8471214|  788|  434|
|  8474564|  655|  342|
|  8474141|  748|  311|
|  8475166|  700|  308|
|  8470794|  782|  305|
+---------+-----+-----+



In [32]:
#Se crean las tablas temporales
top_players.createOrReplaceTempView("top_players")
names.createOrReplaceTempView("names")
#Consultas a las nuevas tablas
(spark.sql("""
select p.player_id, goals, firstName, lastName
from top_players p
join names n
  on p.player_id = n.player_id
order by 2 desc  
""")).show()

+---------+-----+---------+--------+
|player_id|goals|firstName|lastName|
+---------+-----+---------+--------+
|  8471214|  434|     Alex|Ovechkin|
|  8474564|  342|   Steven| Stamkos|
|  8474141|  311|  Patrick|    Kane|
|  8475166|  308|     John| Tavares|
|  8470794|  305|      Joe|Pavelski|
+---------+-----+---------+--------+



In [41]:
spark.sql('''
SELECT game_id from stats
LIMIT 5
''').show()

#De aquí se obtiene la fecha 
# separándolo por '-'

+----------+
|   game_id|
+----------+
|2010020458|
|2010020458|
|2010020458|
|2010020458|
|2010020458|
+----------+



In [42]:
spark.sql("""
select cast(substring(game_id, 1, 4) || '-' 
  || substring(game_id, 5, 2) || '-01' as Date) as month
  , sum(goals)/count(distinct game_id) as goals_per_goal
from stats
group by 1
order by 1
""").show(3)

+----------+------------------+
|     month|    goals_per_goal|
+----------+------------------+
|2010-02-01| 5.464227642276422|
|2010-03-01| 5.606741573033708|
|2011-02-01|5.3203252032520325|
+----------+------------------+
only showing top 3 rows



### MLlib: Regresión Linear
El modelo predice cuántos goles va a meter un jugador

In [43]:
# MLlib imports
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['shots', 'hits', 'assists', 
    'penaltyMinutes','timeOnIce','takeaways'],outputCol="features")
train_df = assembler.transform(df)

# Fit a linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='goals')
lr_model = lr.fit(train_df)

# Output statistics 
trainingSummary = lr_model.summary
print("Coefficients: " + str(lr_model.coefficients))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R2: %f" % trainingSummary.r2)

Coefficients: [0.09353643668143533,-0.006632536309343128,0.005487355356078049,-0.00017779234579156684,-4.317801186031882e-05,0.017566692293079462]
RMSE: 0.378003
R2: 0.126676


In [48]:
# Sample data for a player 
sample_pd = spark.sql("""
  select * from stats
  where player_id = 8471214
""").toPandas()

# Import python libraries 
from scipy.optimize import leastsq
import numpy as np

# Define a function to fit
def fit(params, x, y):
    return (y - (params[0] + x * params[1] ))
# Fit the curve and show the results 
result = leastsq(fit, [1, 0], 
                 args=(sample_pd.shots, sample_pd.hits))
print(result)

(array([ 2.98572669, -0.02376583]), 3)
