# Overview

## &copy;  [Omkar Mehta](omehta2@illinois.edu) ##
### Industrial and Enterprise Systems Engineering, The Grainger College of Engineering,  UIUC ###

<hr style="border:2px solid blue"> </hr>

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/game_skater_stats.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21
game_id,player_id,team_id,timeOnIce,assists,goals,shots,hits,powerPlayGoals,powerPlayAssists,penaltyMinutes,faceOffWins,faceoffTaken,takeaways,giveaways,shortHandedGoals,shortHandedAssists,blocked,plusMinus,evenTimeOnIce,shortHandedTimeOnIce,powerPlayTimeOnIce
2016020045,8468513,4,955,1,0,0,2,0,0,0,0,0,1,1,0,0,1,1,858,97,0
2016020045,8476906,4,1396,1,0,4,2,0,0,2,0,0,1,2,0,0,2,0,1177,0,219
2016020045,8474668,4,915,0,0,1,1,0,0,0,0,0,2,0,0,0,0,-1,805,0,110
2016020045,8473512,4,1367,3,0,0,0,0,2,0,11,27,0,0,0,0,0,-1,1083,19,265
2016020045,8471762,4,676,0,0,3,2,0,0,0,0,0,0,1,0,0,0,-1,613,63,0
2016020045,8478439,4,1008,1,0,2,0,0,0,0,0,0,1,0,0,0,0,-1,911,0,97
2016020045,8479648,4,630,0,0,0,2,0,0,0,0,0,0,0,0,0,0,-1,630,0,0
2016020045,8470159,4,409,0,0,0,0,0,0,2,2,7,1,0,0,0,0,-1,360,49,0
2016020045,8478500,4,1275,0,0,0,1,0,0,0,0,0,0,1,0,0,0,-5,1015,139,121


In [0]:
# Create a view or table

temp_table_name = "game_skater_stats_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `game_skater_stats_csv`

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21
game_id,player_id,team_id,timeOnIce,assists,goals,shots,hits,powerPlayGoals,powerPlayAssists,penaltyMinutes,faceOffWins,faceoffTaken,takeaways,giveaways,shortHandedGoals,shortHandedAssists,blocked,plusMinus,evenTimeOnIce,shortHandedTimeOnIce,powerPlayTimeOnIce
2016020045,8468513,4,955,1,0,0,2,0,0,0,0,0,1,1,0,0,1,1,858,97,0
2016020045,8476906,4,1396,1,0,4,2,0,0,2,0,0,1,2,0,0,2,0,1177,0,219
2016020045,8474668,4,915,0,0,1,1,0,0,0,0,0,2,0,0,0,0,-1,805,0,110
2016020045,8473512,4,1367,3,0,0,0,0,2,0,11,27,0,0,0,0,0,-1,1083,19,265
2016020045,8471762,4,676,0,0,3,2,0,0,0,0,0,0,1,0,0,0,-1,613,63,0
2016020045,8478439,4,1008,1,0,2,0,0,0,0,0,0,1,0,0,0,0,-1,911,0,97
2016020045,8479648,4,630,0,0,0,2,0,0,0,0,0,0,0,0,0,0,-1,630,0,0
2016020045,8470159,4,409,0,0,0,0,0,0,2,2,7,1,0,0,0,0,-1,360,49,0
2016020045,8478500,4,1275,0,0,0,1,0,0,0,0,0,0,1,0,0,0,-5,1015,139,121


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "game_skater_stats_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

# Read Data

In [0]:
# Load data from a CSV
file_location = "/FileStore/tables/game_skater_stats.csv"
df = spark.read.format("CSV").option("inferSchema", True).option("header", True).load(file_location)
display(df.take(5))

game_id,player_id,team_id,timeOnIce,assists,goals,shots,hits,powerPlayGoals,powerPlayAssists,penaltyMinutes,faceOffWins,faceoffTaken,takeaways,giveaways,shortHandedGoals,shortHandedAssists,blocked,plusMinus,evenTimeOnIce,shortHandedTimeOnIce,powerPlayTimeOnIce
2016020045,8468513,4,955,1,0,0,2,0,0,0,0,0,1,1,0,0,1,1,858,97,0
2016020045,8476906,4,1396,1,0,4,2,0,0,2,0,0,1,2,0,0,2,0,1177,0,219
2016020045,8474668,4,915,0,0,1,1,0,0,0,0,0,2,0,0,0,0,-1,805,0,110
2016020045,8473512,4,1367,3,0,0,0,0,2,0,11,27,0,0,0,0,0,-1,1083,19,265
2016020045,8471762,4,676,0,0,3,2,0,0,0,0,0,0,1,0,0,0,-1,613,63,0


## Write Data

In [0]:
# Save as CSV and parquet

# DBFS
df.write.save('/FileStore/parquet/game__stats', format='parquet')

# S3
#df.write.parquet("s3a://my_bucket/game_skater_stats", mode="overwrite")

# DBFS
df.write.save('/FileStore/parquet/game__stats.csv', format='csv')

# S3
#df.coalesce(1).write.format("com.databricks.spark.csv")
#   .option("header", "true").save("s3a://my_bucket/game_skater_stats.csv")


## Transforming Data

In [0]:
df.createOrReplaceTempView("stats")

display(spark.sql("""
  select player_id, sum(1) as games, sum(goals) as goals
  from stats
  group by 1
  order by 3 desc
  limit 5
"""))

player_id,games,goals
8471214,1424,862
8466139,1734,565
8471675,1221,560
8462042,1270,543
8474564,1020,524


In [0]:
# player names
file_location = "/FileStore/tables/player_info.csv"
names = spark.read.format("CSV").option("inferSchema", True).option("header", True).load(file_location)
#display(names)

In [0]:
df.createOrReplaceTempView("stats")

top_players = spark.sql("""
select player_id, sum(1) as games, sum(goals) as goals
from stats
group by 1
order by 3 desc
limit 5
""")

top_players.createOrReplaceTempView("top_players")
names.createOrReplaceTempView("names")

display(spark.sql("""
select p.player_id, goals, firstName, lastName
from top_players p
join names n
  on p.player_id = n.player_id
order by 2 desc  
"""))


player_id,goals,firstName,lastName
8471214,862,Alex,Ovechkin
8466139,565,Patrick,Marleau
8471675,560,Sidney,Crosby
8462042,543,Jarome,Iginla
8474564,524,Steven,Stamkos


In [0]:
display(spark.sql("""
select cast(substring(game_id, 1, 4) || '-' 
  || substring(game_id, 5, 2) || '-01' as Date) as month
  , sum(goals)/count(distinct game_id) as goals_per_goal
from stats
group by 1
order by 1
"""))

month,goals_per_goal
2000-02-01,5.5130081300813005
2001-02-01,5.236585365853658
2002-02-01,5.308943089430894
2003-02-01,5.123577235772358
2005-02-01,6.047154471544715
2006-02-01,5.755284552845528
2007-02-01,5.438211382113821
2008-02-01,5.695934959349594
2009-02-01,5.53089430894309
2010-02-01,5.464227642276422


In [0]:

display(spark.sql("""
select cast(goals/shots * 50 as int)/50.0 as Goals_per_shot, sum(1) as Players 
from (
  select player_id, sum(shots) as shots, sum(goals) as goals
  from stats
  group by 1
  having goals >= 5
)  
group by 1
order by 1
"""))  
  

Goals_per_shot,Players
0.0,11
0.02,183
0.04,341
0.06,338
0.08,407
0.1,352
0.12,260
0.14,75
0.16,31
0.18,11


In [0]:
display(spark.sql("""
  select cast(substring(game_id, 1, 4) || '-' 
    || substring(game_id, 5, 2) || '-01' as Date) as month
    , sum(goals)/count(distinct game_id) as goals_per_goal
  from stats
  group by 1
  order by 1
"""))

month,goals_per_goal
2000-02-01,5.5130081300813005
2001-02-01,5.236585365853658
2002-02-01,5.308943089430894
2003-02-01,5.123577235772358
2005-02-01,6.047154471544715
2006-02-01,5.755284552845528
2007-02-01,5.438211382113821
2008-02-01,5.695934959349594
2009-02-01,5.53089430894309
2010-02-01,5.464227642276422


In [0]:
display(spark.sql("""
  select cast(goals/shots * 50 as int)/50.0 as Goals_per_shot
      ,sum(1) as Players 
  from (
    select player_id, sum(shots) as shots, sum(goals) as goals
    from stats
    group by 1
    having goals >= 5
  )  
  group by 1
  order by 1
"""))

Goals_per_shot,Players
0.0,11
0.02,183
0.04,341
0.06,338
0.08,407
0.1,352
0.12,260
0.14,75
0.16,31
0.18,11


## MLlib: Linear Regression

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(inputCols=['shots', 'assists', 'penaltyMinutes', 'timeOnIce'], outputCol="features" )
train_df = assembler.transform(df) 

lr = LinearRegression(featuresCol = 'features', labelCol='goals')
lr_model = lr.fit(train_df)

trainingSummary = lr_model.summary
print("Coefficients: " + str(lr_model.coefficients))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R2: %f" % trainingSummary.r2)

## Pandas UDFs

In [0]:
# creating a linear fit for a single player

df.createOrReplaceTempView("stats")

sample_pd = spark.sql("""
select * from stats
where player_id = 8471214
""").toPandas()

from scipy.optimize import leastsq
import numpy as np

def fit(params, x, y):
    return (y - (params[0] + x * params[1] ))  

result = leastsq(fit, [1, 0], args=(sample_pd.shots, sample_pd.hits))
print(result)


In [0]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd

schema = StructType([StructField('ID', LongType(), True),
                     StructField('p0', DoubleType(), True),
                     StructField('p1', DoubleType(), True)])  

  
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def analyze_player(sample_pd):
  
    if (len(sample_pd.shots) <= 1):
        return pd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [ 0 ], 'p1': [ 0 ]})
    
    result = leastsq(fit, [1, 0], args=(sample_pd.shots, sample_pd.hits))
    return pd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [result[0][0]], 'p1': [result[0][1]]})

player_df = df.groupby('player_id').apply(analyze_player)
display(player_df.take(5))