# Analyzing NBA player and team stats with Spark/Redshift
### Data Engineering Capstone Project

#### Project Summary


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Step 1: Scope the Project and Gather Data

### Scope

The goal of this capstone project is to:
* Collect NBA player data, season stats data, and team data.
* Extract data from S3 files (in csv, json, txt format) to Spark DataFrame.
* Clean and transform data using Spark, load data back to S3 in parquet format.
* Load them to Redshift tables.
* Analyze NBA dataset for more insights using SQL. I will try to write some queries to answer questions, e.g. 
  * What is the best winning percentage team?
  * Which team have the most star players?
  * Top 10 coach in history?
  * The most efficient player? The best 3 point shooter? The best defensive player in terms of block and steal?
  * How does the game evolve over time? for example, shooting more 3 pointers? or focusing more on defense?

### Describe and Gather Data 

#### DataSet 1: NBA player and player stats per season.
https://www.kaggle.com/drgilermo/nba-players-stats

This dataset contains aggregate individual statistics for 67 NBA seasons. from basic box-score attributes such as points, assists, rebounds etc., to more advanced money-ball like features such as Value Over Replacement.
The data was scraped from [basketball-reference](https://www.basketball-reference.com/)

* **Players.csv**: 
This file basic player information, e.g. weight, height, college.
Since all the play names in this file are unique, I will mainly use this csv file to create player table. Sample data:
|Id | Player | height | weight | collage | born | birth_city | birth_state |
|:-|:-|:-|:-|:-|:-|:-|:-|
|2590 | Vince Carter | 198 | 99 | University of North Carolina | 1977 | Daytona Beach | Florida |

* **player_data.json**: 
This file contains extra player information, e.g. more accurate birth date.
Since this file contains duplicate NBA players names, as I show in Step 2: Explore and Assess the Data, for this project, I will only use it to augment the birth date information in the player table.
```
    "4290": {
        "name": "Russell Westbrook",
        "year_start": "2009",
        "year_end": "2018",
        "position": "G",
        "height": "6-3",
        "weight": "200",
        "birth_date": "November 12, 1988",
        "college": "University of California, Los Angeles"
    }
```

* **Seasons_Stats.csv**: 
This file contains NBA player stats over all the seasons, from 1950 to 2015. 
The column names are abbreviated, e.g. **3P%** - 3-Point Field Goal Percentage (available since the 1979-80 season in the NBA); the formula is 3P / 3PA.
More detailed column description can be found in [glossary](https://www.basketball-reference.com/about/glossary.html)
Maybe expand them to more human readable format when creating tables on Redshift. Sample data:

| Id | Year | Player | Pos | Age | Tm | ... | AST | STL | BLK | TOV | PF | PTS |
|:-|:-|:-|:-|:-|:-|:-|:-|:-|:-|:-|:-|:-|
| 16746 | 2004 | LeBron James | SG | 19 | CLE | ... | 465 | 130 | 58 | 273 | 149 | 1654 |



#### DataSet 2: NBA team record per season.
https://www.kaggle.com/boonpalipatana/nba-season-records-from-every-year
This dataset contains every season record for each NBA teams from 73 seasons (#wins, #losses, standing, playoff result, and more).
* **Team_Records.csv**:
This file contains every season record for each NBA team from 73 seasons, from 1946 to 2017.

| Season | Lg | Team | W | L | W/L% | Finish | ...  | Coaches | Top WS |
|:-|:-|:-|:-|:-|:-|:-|:-|:-|:-|
| 2004-05 | NBA | Boston Celtics* | 45 | 37 | 0.549 | 1 | ...       | D. Rivers (45-37) | P. Pierce (11.2) |
| 2003-04 | NBA | Boston Celtics* | 36 | 46 | 0.439 | 4 | ...       | J. O'Brien (22-24) J. Carroll (14-22) | P. Pierce (7.1) |

Multiple coaches can coach the same team in a season, thus I need to parse "J. O'Brien (22-24) J. Carroll (14-22),P. Pierce (7.1)" into a list of coaching history.


#### DataSet 3: NBA team timeline.
http://www.shrpsports.com/nba/explain.htm

This is a webpage that contains team name, team abbrevation, start and end season.
Dataset 1 (player stats) uses team abbrevation, while dataset 2 (team stats) uses full team name, establishing the mapping between abbrev and full name (e.g. GSW => Golden State Warrior) requires a lots of manual work, I hope to automate joining two tables using information in this webpage.

* **team-abbrevation.txt**:
This files contains city, abbrevation, team name and time.

```
Baltimore    	Bal	Baltimore Bullets (2nd team) (1963-64 - 1972-73)
Boston       	Bos	Boston Celtics (1946-47 - present)
Brooklyn     	Bkn	Brooklyn Nets (2012-13 - present)
Buffalo      	Buf	Buffalo Braves (1970-71 - 1977-78)
Capital      	Cap	Capital Bullets (1973-74)
Charlotte    	Cha	Charlotte Hornets (1988-89 - 2001-02, 2014-15 - present)
Cha Bobcats  	ChB	Charlotte Bobcats (2004-05 - 2013-14)
```

## Step 2: Explore and Assess the Data
### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

### Cleaning Steps
Document steps necessary to clean the data

In [1]:
import boto3
import os
import configparser
from datetime import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col, isnan, when, count, trim, desc, sum, asc
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import countDistinct, explode, split, concat_ws, collect_list
from pyspark.sql.types import (
    StructType as R,
    StructField as Fld,
    DoubleType as Dbl,
    StringType as Str,
    IntegerType as Int,
    DateType as Date,
    TimestampType as Ts,
)

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['KEY']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['SECRET']

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [None]:
# load players.csv
playerSchema = R([
    Fld("id", Int()),
    Fld("Player", Str()),
    Fld("height", Int()),
    Fld("weight", Int()),
    Fld("collage", Str()),
    Fld("born", Int()),
    Fld("birth_city", Str()),
    Fld("birth_state", Str()),
])
dfPlayer = spark.read.csv("s3a://udacity-data-eng-capstone/Players.csv", header=True, schema=playerSchema)
#dfPlayer.printSchema()
dfPlayer.show(5)
print("count = ", dfPlayer.count())

In [None]:
# rename dfPlayer.Player column as dfPlayer.name
# rename dfPlayer.collage column as dfPlayer.college
dfPlayer = dfPlayer.withColumn(
    "name", dfPlayer.Player
).drop(
    "Player"
).withColumn(
    "college", dfPlayer.collage
).drop(
    "collage"
)

In [None]:
dfPlayer.select("name").where(dfPlayer.name.like('%Iverson%')).show()

#### Need to clean up player name, some hall of famer have star in their names "Yao Ming*", "Allen Iverson*"

In [None]:
# trim * in name
dfPlayer = dfPlayer.withColumn("name", F.regexp_replace("name", "\*+", ""))
#dfPlayer = dfPlayer.withColumn("name", F.regexp_replace("name", "([\w+\s]+)", "$1")) #figure out capture group

In [None]:
# verify names are trimmed
dfPlayer.select("name").where(dfPlayer.name.like('%Iverson%')).show()
dfPlayer.select("name").where(dfPlayer.name.like('%Yao Ming%')).show()

In [None]:
# player with the same name?
dfPlayer.groupBy("name").count().filter("count > 1").show(truncate=False)

In [None]:
# inspect player with identical names
dfPlayer.where(dfPlayer.name == 'Patrick Ewing').show(truncate=False)
dfPlayer.where(dfPlayer.name == 'Gary Payton').show(truncate=False)

In [None]:
# Since they have identical record, except id, so its safe to drop them
print("before delete, num rows", dfPlayer.count())
dfPlayer = dfPlayer.dropDuplicates(["name", "born"])
print("after  delete, num rows", dfPlayer.count())

### Load player_data2.json in dataFrame "dfplayExtra", this file contains duplicate player names, also the birth date is more accurate than Players.csv

* will parse player birth from dfPlayerExtra, and add extra colums (birth_day, birth_month, birth_year) to dfPlayer.

In [None]:
# load player_data2.json
playerExtraSchema = R([
    Fld("name", Str()),
    Fld("year_start", Int()),
    Fld("year_end", Int()),
    Fld("position", Str()),
    Fld("height", Str()),
    Fld("weight", Int()),
    Fld("birth_date", Str()),
    Fld("college", Str()),
])
# json file was generated by `df.to_json('player_data2.json', orient='records', indent=4)`
dfPlayerExtra = spark.read.option("multiline", "true").json(
    "s3a://udacity-data-eng-capstone/player_data2.json"
)
#dfPlayerExtra.printSchema()
dfPlayerExtra.show(5)
print("count = ", dfPlayerExtra.count())

In [None]:
dfPlayerExtra = dfPlayerExtra.withColumn(
    "name", F.regexp_replace("name", "\*+", "") # trim * in name
).withColumn(
    "birth_date_split", F.split(F.regexp_replace("birth_date", ",", ""), " ")
)

dfPlayerExtra = dfPlayerExtra.withColumn(
    "birth_month", dfPlayerExtra.birth_date_split.getItem(0) # need to convert Jan=>1
).withColumn(
    "birth_day",   dfPlayerExtra.birth_date_split.getItem(1).cast(Int())
).withColumn(
    "birth_year",  dfPlayerExtra.birth_date_split.getItem(2).cast(Int())
).drop(
    "birth_date_split"
).drop(
    "birth_date"
).dropna(
    subset=["birth_year", "birth_month", "birth_day"]
)

In [None]:
dfPlayerExtra.select(["name", "birth_year", "birth_month", "birth_day"]).show(5)

In [None]:
# find all distinct months
dfPlayerExtra.select("birth_month").dropDuplicates().show()

In [None]:
# convert month Str=>Int, e.g. Jan=>1
map_month = {
    "July":         7,
    "November":     11,
    "February":     2,
    "January":      1,
    "March":        3,
    "October":      10,
    "May":          5,
    "August":       8,
    "April":        4,
    "June":         6,
    "December":     12,
    "September":    9,
}

def translate(mapping):
    def translate_(col):
        return mapping.get(col, col)
    return udf(translate_, Int())

dfPlayerExtra = dfPlayerExtra.withColumn("birth_month", translate(map_month)("birth_month"))

In [None]:
# check translate is successful
dfPlayerExtra.select("birth_month").dropDuplicates().show()
dfPlayerExtra.show(2)
#dfPlayerExtra.printSchema()

In [None]:
# add column birth_month (timestamp), will later separate (year, month, day, ts) to a dim table later, when loading to Redshift
from datetime import datetime

def translate():
    def translate_(y, m, d):
        return datetime(y, m, d)
    return udf(translate_, Ts())

dfPlayerExtra = dfPlayerExtra.withColumn("birth_ts", translate()("birth_year","birth_month", "birth_day"))

In [None]:
# check add column successfully
dfPlayerExtra.show(2)

In [None]:
uniqPlayer = dfPlayer.select("name").dropDuplicates().collect()
uniqPlayerExtra = dfPlayerExtra.select("name").dropDuplicates().collect()

In [None]:
# players in extra, not in orig table
diff1 = set(uniqPlayerExtra) - set(uniqPlayer)
print(len(diff1))

In [None]:
# players in orig table, not in extra
# dfPlayer p1 left join dfPlayerExtra p2 on p1.name = p2.name, how many rows will have null value on p2
diff2 = set(uniqPlayer) - set(uniqPlayerExtra)
print(len(diff2))
print(diff2)

In [None]:
# print the count of null for each columns
dfPlayer.select([count(when(col(c).isNull(), c)).alias(c) for c in dfPlayer.columns]).show()
dfPlayerExtra.select([count(when(col(c).isNull(), c)).alias(c) for c in dfPlayerExtra.columns]).show()

In [None]:
dfJoinPlayer = dfPlayer.join(
    dfPlayerExtra,
    (dfPlayer.name == dfPlayerExtra.name) & (dfPlayer.born == dfPlayerExtra.birth_year),
    "left"
).drop(
    dfPlayer.born
).drop(
    dfPlayerExtra.name
).drop(
    dfPlayerExtra.college
).drop(
    dfPlayerExtra.height
).drop(
    dfPlayerExtra.weight
)
print("dfPlayer      count = ", dfPlayer.count())
print("dfPlayerExtra count = ", dfPlayerExtra.count())
print("dfJoinPlayer  count = ", dfJoinPlayer.count())

In [None]:
dfJoinPlayer.show(2)

#### Create dfBirthTime dataframe and save to s3.

In [None]:
dfBirthTime = dfJoinPlayer.select(["birth_month", "birth_day", "birth_year", "birth_ts"]).dropDuplicates().dropna("any")
dfBirthTime.count()

In [None]:
dfBirthTime.write.parquet("s3a://udacity-data-eng-capstone-parquet/dimBirthTime/", mode="overwrite")

In [None]:
teamStatsSchema = R([
    Fld("Season", Str()),
    Fld("Lg", Str()),
    Fld("Team", Str()),
    Fld("W", Int()),
    Fld("L", Int()),
    Fld("WoLpc", Dbl()), # W/L%
    Fld("Finish", Int()),
    Fld("SRS", Dbl()),
    Fld("Pace", Dbl()),
    Fld("Rel_Pace", Dbl()),
    Fld("ORtg", Dbl()),
    Fld("Rel_ORtg", Dbl()),
    Fld("DRtg", Dbl()),
    Fld("Rel_DRtg", Dbl()),
    Fld("Playoffs", Str()),
    Fld("Coaches", Str()),
    Fld("Top WS", Str()),
])
dfTeamStats = spark.read.csv(
    "s3a://udacity-data-eng-capstone/Team_Records.csv",
    header=True, schema=teamStatsSchema,
)
#dfTeamStats.printSchema()
dfTeamStats.show(5)
print("count = ", dfTeamStats.count())

In [None]:
playerStatsSchema = R([
    Fld("Id",       Int()),
    Fld("Year",     Int()),
    Fld("Player",   Str()),
    Fld("Pos",      Str()),
    Fld("Age",      Int()),
    Fld("Tm",       Str()),
    Fld("G",        Int()),
    Fld("GS",       Int()),
    Fld("MP",       Int()),
    Fld("PER",      Dbl()),
    Fld("TS%",      Dbl()),
    Fld("3PAr",     Dbl()),
    Fld("FTr",      Dbl()),
    Fld("ORB%",     Dbl()),
    Fld("DRB%",     Dbl()),
    Fld("TRB%",     Dbl()),
    Fld("AST%",     Dbl()),
    Fld("STL%",     Dbl()),
    Fld("BLK%",     Dbl()),
    Fld("TOV%",     Dbl()),
    Fld("USG%",     Dbl()),
    Fld("blanl",    Str()),
    Fld("OWS",      Dbl()),
    Fld("DWS",      Dbl()),
    Fld("WS",       Dbl()),
    Fld("WS/48",    Dbl()),
    Fld("blank2",   Str()),
    Fld("OBPM",     Dbl()),
    Fld("DBPM",     Dbl()),
    Fld("BPM",      Dbl()),
    Fld("VORP",     Dbl()),
    Fld("FG",       Int()),
    Fld("FGA",      Int()),
    Fld("FG%",      Dbl()),
    Fld("3P",       Int()),
    Fld("3PA",      Int()),
    Fld("3P%",      Dbl()),
    Fld("2P",       Int()),
    Fld("2PA",      Int()),
    Fld("2P%",      Dbl()),
    Fld("eFG%",     Dbl()),
    Fld("FT",       Int()),
    Fld("FTA",      Int()),
    Fld("FT%",      Dbl()),
    Fld("ORB",      Int()),
    Fld("DRB",      Int()),
    Fld("TRB",      Int()),
    Fld("AST",      Int()),
    Fld("STL",      Int()),
    Fld("BLK",      Int()),
    Fld("TOV",      Int()),
    Fld("PF",       Int()),
    Fld("PTS",      Int()),
])

dfPlayerStats = spark.read.csv(
    "s3a://udacity-data-eng-capstone/Seasons_Stats.csv",
    header=True, schema=playerStatsSchema)

#dfPlayerStats.printSchema()
dfPlayerStats.show(5)
print("count = ", dfPlayerStats.count())

## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### 3.3 Create Redshift

In [None]:
import pandas as pd
import boto3
import json

In [None]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param": ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", 
                        "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value": [ DWH_CLUSTER_TYPE ,  DWH_NUM_NODES ,  DWH_NODE_TYPE ,  DWH_CLUSTER_IDENTIFIER , 
                         DWH_DB ,  DWH_DB_USER ,  DWH_DB_PASSWORD ,  DWH_PORT ,  DWH_IAM_ROLE_NAME ]
             })

In [None]:
args = {
    "region_name": "us-west-2",
    "aws_access_key_id": KEY,
    "aws_secret_access_key": SECRET
}

ec2 = boto3.resource('ec2', **args)
s3 = boto3.resource('s3', **args)
iam = boto3.client('iam', **args)
redshift = boto3.client('redshift', **args)

In [None]:
s3bucket =  s3.Bucket("udacity-data-eng-capstone-parquet") # private

s3_data = iter(s3bucket.objects.filter(Prefix="dimBirthTime/"))
for _ in range(5): print(next(s3_data))

In [None]:
try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    

except Exception as e:
    print(e)

In [None]:
print('1.2 Attaching Policy')
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

In [None]:
print('1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
print(roleArn)

In [None]:
try:
    response = redshift.create_cluster(        
        # parameters for hardware
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        # parameters for identifiers & credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        # parameter for role (to allow s3 access)
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

In [None]:
# wait till cluster status is availabe
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

### Print and copy them to dwh.cfg, erase before submitting or pushing to github

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

In [None]:
%load_ext sql

## Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

## Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [None]:
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)