## Feature Engineering

### Feature transformation

The following none exhaustive list gives you some guidelines for feature transformation:

· Imputing Some algorithms are very sensitive to missing values. Therefore, imputing allows for filling of empty fields based on its value distribution 
<br><font color=blue>I have chosen filter out missing values</font>

· Imputed time-series quantization Time series often contain streams with measurements at different timestamps. Therefore, it is beneficial to quantize measurements to a common “heart beat” and impute the corresponding values. This can be done by sampling from the source time series distributions on the respective quantized time steps
<br><font color=blue>All season measurements for for the year.  By convention, I choose the start of the calendar year to represent the season that starts in Sept of that year (and end the Feb of the next year)</font>

· Scaling / Normalizing / Centering Some algorithms are very sensitive differences in value ranges for individual fields. Therefore, it is best practice to center data around zero and scale values to a standard deviation of one
<br><font color=blue>I normalize the variables that have been selected to between 0 and 1</font>

· Filtering Sometimes imputing values doesn’t perform well, therefore deletion of low quality records is a better strategy
<br><font color=blue>I have chosen filter out missing values</font>

· Discretizing Continuous fields might confuse the model, e.g. a discrete set of age ranges sometimes performs better than continuous values, especially on smaller amounts of data and with simpler models 
<br><font color=blue>Currently, all continuos fields are left as they are.  Will review the model performance to see if grouping will work better</font>

In [1]:
import pandas as pd
import numpy as np

In [2]:
all_df = pd.read_csv(r'pfr_nfl.csv')

### Feature Creation

The following none exhaustive list gives you some guidelines for feature creation:

· One-hot-encoding Categorical integer features should be transformed into “one-hot” vectors. In relational terms this results in addition of additional columns – one columns for each distinct category

· Time-to-Frequency transformation Time-series (and sometimes also sequence data) is recorded in the time domain but can easily transformed into the frequency domain e.g. using FFT (Fast Fourier Transformation)

· Month-From-Date Creating an additional feature containing the month independent from data captures seasonal aspects. Sometimes further discretization in to quarters helps as well

· Aggregate-on-Target Simply aggregating fields the target variable (or even other fields) can improve performance, e.g. count number of data points per ZIP code or take the median of all values by geographical region

In [3]:
# use passing / rushing / receiving attempts as a feature, averaged over past 3y
all_df['3Y-Passing-Att'] = all_df.groupby('Player')['Passing-Att'].apply(lambda x: x.rolling(3).mean())
all_df['3Y-Rushing-Att'] = all_df.groupby('Player')['Rushing-Att'].apply(lambda x: x.rolling(3).mean())
all_df['3Y-Receiving-Tgt'] = all_df.groupby('Player')['Receiving-Tgt'].apply(lambda x: x.rolling(3).mean())
all_df[' '] = all_df.groupby('Player')['Fantasy-FantPt'].apply(lambda x: x.rolling(3).mean())

# strip out out the current year which is the label that the ML will try to predict
all_df['3Y-Passing-Att'] = (all_df['3Y-Passing-Att']*3 - all_df['Passing-Att']) / 2
all_df['3Y-Rushing-Att'] = (all_df['3Y-Rushing-Att']*3 - all_df['Rushing-Att']) / 2
all_df['3Y-Receiving-Tgt'] = (all_df['3Y-Receiving-Tgt']*3 - all_df['Receiving-Tgt']) / 2
all_df['3Y-Fantasy-FantPt'] = (all_df['3Y-Fantasy-FantPt']*3 - all_df['Fantasy-FantPt']) / 2

**Second iteration**

Have added an additional feature for each player that is based on the number of points scored by that player in that season over the next highest scoring player.

In [4]:
# player points over next highest points at same position in the same year
all_df['FP_above_next_at_posn'] = all_df.groupby(['FantPos','Year'])['Fantasy-FantPt'].apply(lambda x: -x.diff().shift(-1))
# 3 yr avg of that metric by player
all_df['3Y-FP_over'] = all_df.groupby('Player')['FP_above_next_at_posn'].apply(lambda x: x.rolling(3).mean())

**Third iteration**

Add age going into season

In [5]:
all_df['Age_into_season'] = all_df.groupby('Player')['Age'].apply(lambda x: x.rolling(3).mean())

In [6]:
all_df.sample(20)

Unnamed: 0.1,Unnamed: 0,Year,Accolades,Rk_x,Player,Tm,FantPos,Age,G_x,GS,...,FUM,Lost,TO,3Y-Passing-Att,3Y-Rushing-Att,3Y-Receiving-Tgt,3Y-Fantasy-FantPt,FP_above_next_at_posn,3Y-FP_over,Age_into_season
2287,2287,2015,,128,Drew Stanton,ARI,QB,30,9,8.0,...,23.0,11.0,9.0,179.5,21.5,0.0,77.0,5.0,5.333333,28.666667
1226,1226,2010,,189,Correll Buckhalter,DEN,RB,32,15,0.0,...,29.0,15.0,-9.0,0.0,98.0,35.0,93.5,1.0,3.333333,31.0
1758,1758,2013,,59,Marvin Jones,CIN,WR,23,16,3.0,...,20.0,10.0,1.0,,,,,2.0,,
2743,2743,2017,,131,C.J. Beathard,SF,QB,24,7,5.0,...,14.0,8.0,-3.0,,,,,2.0,,
1082,1082,2010,,45,Santana Moss,WAS,WR,31,16,16.0,...,29.0,12.0,-4.0,0.0,1.5,129.0,126.0,3.0,2.333333,30.0
2791,2791,2017,,179,Brian Hoyer,2TM,QB,32,11,6.0,...,,,,319.0,15.5,0.0,119.5,2.0,1.666667,30.666667
1317,1317,2011,,60,Dustin Keller,NYJ,TE,27,16,12.0,...,25.0,16.0,-3.0,0.0,0.5,91.0,83.0,1.0,1.333333,26.0
2362,2362,2015,,203,Tavon Austin,LAR,WR,24,15,8.0,...,22.0,10.0,5.0,0.5,22.5,56.5,72.0,-0.0,1.666667,23.666667
136,136,2005,,137,Marcus Robinson,MIN,WR,30,15,9.0,...,,,,,,,,2.0,,
792,792,2008,,177,Billy Miller,NO,TE,31,15,5.0,...,25.0,16.0,11.0,,,,,1.0,,


In [7]:
# use select team offensive stats as features, averaged over past 3y.
# These indicate more plays and better quality (results) of the plays
# These will capture the fact that individual performance is also tied into team performance
all_df['3Y-Scrm Plys'] = all_df.groupby('Team')['Scrm Plys'].apply(lambda x: x.rolling(3).mean())
all_df['3Y-1st/G'] = all_df.groupby('Team')['1st/G'].apply(lambda x: x.rolling(3).mean())
all_df['3Y-3rd Pct'] = all_df.groupby('Team')['3rd Pct'].apply(lambda x: x.rolling(3).mean())
all_df['3Y-TO'] = all_df.groupby('Team')['TO'].apply(lambda x: x.rolling(3).mean())

# strip out out current year
all_df['3Y-Scrm Plys'] = (all_df['3Y-Scrm Plys']*3 - all_df['Scrm Plys']) / 2
all_df['3Y-1st/G'] = (all_df['3Y-1st/G']*3 - all_df['1st/G']) / 2
all_df['3Y-3rd Pct'] = (all_df['3Y-3rd Pct']*3 - all_df['3rd Pct']) / 2
all_df['3Y-TO'] = (all_df['3Y-TO']*3 - all_df['TO']) / 2

In [8]:
# nfl_seasons['ToP/G'] = pd.to_datetime(nfl_seasons['ToP/G'], format='%M:%S')
# nfl_seasons['3Y-ToP/G'] = nfl_seasons.groupby('Team')['ToP/G'].apply(lambda x: x.rolling(3).mean())

    # not bothering
    # NotImplementedError: ops for Rolling for this dtype datetime64[ns] are not implemented    

In [9]:
# For classification
# Refine the problem for a multiclassifier of tier of player
#		top 5 players: 0
#		top 6-10: 1
#		top 11-15: 2
#		Rest: 3

# Get the rank by position for the year
all_df['player_rank'] = all_df.groupby(['FantPos','Year'])['Fantasy-FantPt'].rank(ascending=False)

In [10]:
def get_tier(rank):
    if rank <=5:
        return 0
    if rank <=10:
        return 1
    if rank <=15:
        return 2
    else:
        return 3
    
all_df['player_tier'] = all_df.player_rank.apply(lambda x: get_tier(x))

In [11]:
# get the attempts per postion
# {QB: throws, RB: rushes, WR: receptions, TE: receptions}
def get_posn_attempts(df_row):
    if df_row.FantPos == 'QB':
        return df_row['3Y-Passing-Att']
    if df_row.FantPos == 'RB':
        return df_row['3Y-Rushing-Att']
    if df_row.FantPos == 'WR':
        return df_row['3Y-Receiving-Tgt']
    if df_row.FantPos == 'TE':
        return df_row['3Y-Receiving-Tgt']
    else:
        return 0

In [12]:
all_df['posn_attempts'] = all_df.apply(lambda x: get_posn_attempts(x), axis=1)

In [13]:
all_df.columns

Index(['Unnamed: 0', 'Year', 'Accolades', 'Rk_x', 'Player', 'Tm', 'FantPos',
       'Age', 'G_x', 'GS', 'Passing-Cmp', 'Passing-Att', 'Passing-Yds',
       'Passing-TD', 'Passing-Int', 'Rushing-Att', 'Rushing-Yds',
       'Rushing-Y/A', 'Rushing-TD', 'Receiving-Tgt', 'Receiving-Rec',
       'Receiving-Yds', 'Receiving-Y/R', 'Receiving-TD', 'Fumbles-Fmb',
       'Fumbles-FL', 'Scoring-TD', 'Scoring-2PM', 'Scoring-2PP',
       'Fantasy-FantPt', 'Fantasy-PPR', 'Fantasy-DKPt', 'Fantasy-FDPt',
       'Fantasy-VBD', 'Fantasy-PosRank', 'Fantasy-OvRank', 'Year_DT', 'Rk_y',
       'Team', 'Team_Short', 'G_y', 'Pts/G', 'TotPts', 'Scrm Plys', 'Yds/G',
       'Yds/P', '1st/G', '3rd Md', '3rd Att', '3rd Pct', '4th Md', '4th Att',
       '4th Pct', 'Pen', 'Pen Yds', 'ToP/G', 'FUM', 'Lost', 'TO',
       '3Y-Passing-Att', '3Y-Rushing-Att', '3Y-Receiving-Tgt',
       '3Y-Fantasy-FantPt', 'FP_above_next_at_posn', '3Y-FP_over',
       'Age_into_season', '3Y-Scrm Plys', '3Y-1st/G', '3Y-3rd Pct', '3Y-TO',


In [14]:
all_df.to_csv(r'pfr_nfl_w_features.csv', index=False)

## Data Cleaning
***

From the assignment page (https://www.coursera.org/learn/advanced-data-science-capstone/supplement/HJHBm/data-cleansing):

The following none exhaustive list gives you some guidelines:

· Data types Are data types of columns matching their content? E.g. is age stored as integer and not as string?
<br><font color=blue>I have extracted only the fields I think are relevant.  Each of those fields are cast to the appropriate type</font>

· Ranges Does the value distribution of values in a column make sense? Use stats (e.g. min, max, mean, standard deviation) and visualizations (e.g. box-plot, histogram) for help
<br><font color=blue>I have checked fantasy points and spot checked some of the other fields</font>

· Emptiness Are all values non-null where mandatory? E.g. client IDs
<br><font color=blue>I have filtered out null values</font>

· Uniqueness Are duplicates present where undesired? E.g. client IDs
<br><font color=blue>Each player and year row is unique</font>

· Set memberships Are only allowed values chosen for categorical or ordinal fields? E.g. Female, Male, Unknown
<br><font color=blue>Fantasy Position is categorical, all others are continuous ordinal fields</font>

· Foreign key set memberships Are only allowed values chosen as field? E.g. ZIP code
<br><font color=blue>All numeric fields are coerced to float and nulls are then filtered out</font>

· Regular expressions Some files need to stick to a pattern expressed by a regular expression. E.g. a lower-case character followed by 6 digits
<br><font color=blue>N/A.  All data in each dataset comes from one file</font>

· Cross-field validation Some fields can impact validity of other fields. E.g. a male person can’t be pregnant 
<br><font color=blue>Yes, RB have predominantly rushing stats, QBs passing stats, WR receiving stats</font>

Some initial data cleaning has already been covered in the YahooFF.data_exp.python.v1.py module.  More checks will be performed here to ensure all data are in the correct format

In [14]:
sdf = (spark.read.format("csv").options(header="true").load("pfr_nfl_w_features.csv"))
rdd = sdf.rdd
sdf.createOrReplaceTempView("sdf")

In [15]:
# Update Types
sdf = spark.sql('''
SELECT cast(Year as int) Year, Player, Tm, FantPos,
cast(`3Y-Passing-Att` as float) `3Y-Passing-Att`,
cast(`3Y-Rushing-Att` as float) `3Y-Rushing-Att`,
cast(`3Y-Receiving-Tgt` as float) `3Y-Receiving-Tgt`,
cast(`3Y-Fantasy-FantPt` as float) `3Y-Fantasy-FantPt`,
cast(`3Y-FP_over` as float) `3Y-FP_over`,
cast(`3Y-Scrm Plys` as float) `3Y-Scrm_Plys`,
cast(`3Y-1st/G` as float) `3Y-1st/G`,
cast(`3Y-3rd Pct` as float) `3Y-3rd_Pct`,
cast(`3Y-TO` as float) `3Y-TO`,
cast(`player_tier` as float) `player_tier`,
cast(`posn_attempts` as float) `player_attempts`,
cast(`Fantasy-FantPt` as float) `Fantasy-FantPt`,
cast(`Age_into_season` as float) `Age_into_season`
FROM sdf
WHERE `3Y-Passing-Att` is not null
AND `3Y-Rushing-Att` is not null
AND `3Y-Receiving-Tgt` is not null
AND `3Y-Fantasy-FantPt` is not null
AND `3Y-FP_over` is not null
AND `3Y-Scrm Plys` is not null
AND `3Y-1st/G` is not null
AND `3Y-3rd Pct` is not null
AND `3Y-TO` is not null
AND `player_tier` is not null
AND `posn_attempts` is not null
AND `Age_into_season` is not null
''')
sdf.createOrReplaceTempView("sdf")

## Create normalized field by group

It is not possible to use group by with pyspark minmaxscaler. So we replicate this with a SQL call

https://stackoverflow.com/questions/50043101/spark-minmaxscaler-on-dataframe

In [16]:
# Create a table with min and max values for attempts per position in a given year.  To be used for normalization
norm_df = spark.sql('''
SELECT FantPos AS FantPos2, cast(Year as int) Year2, min(player_attempts) AS norm_min, max(player_attempts) AS norm_max
FROM sdf
GROUP BY FantPos, Year
''')
norm_df.createOrReplaceTempView("norm_df")

In [17]:
norm_df.show()

+--------+-----+--------+--------+
|FantPos2|Year2|norm_min|norm_max|
+--------+-----+--------+--------+
|      RB| 2016|     0.0|   392.0|
|      RB| 2015|     1.5|   313.0|
|      RB| 2011|     0.0|   337.0|
|      TE| 2009|    63.0|   154.0|
|      QB| 2010|   186.0|   585.5|
|      WR| 2013|    50.5|   181.0|
|      QB| 2011|   179.0|   586.0|
|      WR| 2012|    59.5|   179.5|
|      TE| 2016|    38.0|   131.0|
|      WR| 2015|    44.5|   174.0|
|      RB| 2017|     1.0|   342.5|
|      QB| 2012|   221.0|   657.5|
|      WR| 2014|    50.5|   180.0|
|      QB| 2007|   204.5|   539.5|
|      QB| 2015|   164.5|   654.5|
|      TE| 2008|    51.0|   134.0|
|      RB| 2010|    77.0|   338.5|
|      QB| 2016|   204.0|   659.0|
|      WR| 2011|    59.0|   172.0|
|      RB| 2014|    50.5|   308.0|
+--------+-----+--------+--------+
only showing top 20 rows



In [18]:
sdf = spark.sql('''
SELECT * FROM norm_df
LEFT JOIN sdf
ON (norm_df.FantPos2 = sdf.FantPos AND norm_df.Year2 = sdf.Year)
''')
sdf = sdf.drop('FantPos2').drop('Year2')
sdf.show()

+--------+--------+----+-----------------+---+-------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----+-----------+---------------+--------------+---------------+
|norm_min|norm_max|Year|           Player| Tm|FantPos|3Y-Passing-Att|3Y-Rushing-Att|3Y-Receiving-Tgt|3Y-Fantasy-FantPt|3Y-FP_over|3Y-Scrm_Plys|3Y-1st/G|3Y-3rd_Pct|3Y-TO|player_tier|player_attempts|Fantasy-FantPt|Age_into_season|
+--------+--------+----+-----------------+---+-------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----+-----------+---------------+--------------+---------------+
|     0.0|   392.0|2016|       Matt Jones|WAS|     RB|           0.0|           0.0|            78.5|             72.0| 2.3333333|      1009.0|    21.6|      45.0|  0.0|        3.0|            0.0|          67.0|           24.0|
|     0.0|   392.0|2016|      Doug Martin| TB|     RB|           0.0|         134.0|

In [19]:
# using rdd function for column manipulation, easier than with SQL
# center start of range to zero
sdf = sdf.withColumn("player_att_norm", (sdf["player_attempts"]-sdf["norm_min"])/(sdf["norm_max"]-sdf["norm_min"]))

**Iteration 4**

This creates a min and max for each position for each field, and where the player falls from a scale from zero to one. <br>
This creates a distribution around the position average score and assigns the player a z-score based on his points relative to that distribution for the year

This essentially does what I did manually in feature_eng.python.v1 with player attempts but wraps it in a function so I can loop over many fields

In [1]:
# load the csv file with the current progress.
sdf = (spark.read.format("csv").options(header="true").load("pfr_nfl_w_features_2.csv"))
sdf.createOrReplaceTempView("sdf")

In [20]:
# fields to generate a normalized feature for and for which positions they are relevant

fields_to_use = ["3Y-Passing-Att",
                "3Y-Rushing-Att",
                "3Y-Receiving-Tgt",
                "3Y-Fantasy-FantPt",
                "3Y-FP_over",
                "3Y-Scrm_Plys",
                "3Y-1st/G",
                "3Y-3rd_Pct",
                "3Y-TO",                
                "player_attempts",
                "Fantasy-FantPt",
                "Age_into_season"]

for_posn = [['QB'],
            ['RB'],
            ['WR', 'TE'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB'],
            ['QB', 'WR', 'TE', 'RB']]

In [21]:
from pyspark.sql.functions import when
import datetime as dt

In [22]:
import pdb

In [23]:
def feature_creation(field, func1, func2, relevant_posn, sdf):
    # creates extra variables to use to normalize a players stats for his position and season    
    # relevant_posn is a list of relevant positions
    norm_df = spark.sql('''
        SELECT FantPos AS FantPos2, cast(Year as int) Year2, {1}(`{0}`) AS `{0}_{1}`, {2}(`{0}`) AS `{0}_{2}`
        FROM sdf
        GROUP BY FantPos, Year
        '''.format(field, func1, func2))
    # zeros out the stat for non-relevant position.  i.e. don't wnat the WR that passed once in the season on a trick play to have an impact
    norm_df = norm_df.withColumn('{0}_{2}'.format(field, func1, func2), when(norm_df.FantPos2.isin(relevant_posn),norm_df['{0}_{2}'.format(field, func1, func2)]).otherwise(0))
    norm_df.createOrReplaceTempView("norm_df") # this is now available to query with SQL
    
    sdf = spark.sql('''
        SELECT * FROM norm_df
        LEFT JOIN sdf
        ON (norm_df.FantPos2 = sdf.FantPos AND norm_df.Year2 = sdf.Year)
        ''')
    sdf = sdf.drop('FantPos2').drop('Year2')    
    
    # calculate normalized value for the field relative to other players    
    if func1 == 'min':
        # for normalized values between min and max
        # (val-min) / (max-min)
        #pdb.set_trace()
        sdf = sdf.withColumn('{0}_minmax'.format(field, func1, func2),
                             (sdf[field]-sdf['{0}_{1}'.format(field, func1, func2)])
                             / (sdf['{0}_{2}'.format(field, func1, func2)]-sdf['{0}_{1}'.format(field, func1, func2)]))
        sdf = sdf.na.fill({'{0}_minmax'.format(field):0})
    elif func1 == 'mean':
        # for values in standard deviation terms
        # (val-mean)/std
        sdf = sdf.withColumn('{0}_zscore'.format(field, func1, func2),
                             (sdf[field]-sdf['{0}_{1}'.format(field, func1, func2)]) / sdf['{0}_{2}'.format(field, func1, func2)])
        sdf = sdf.na.fill({'{0}_zscore'.format(field):0})
    
    #pdb.set_trace()
    sdf.createOrReplaceTempView("sdf")  # so sdf is updated in the global namespace too.  can use in loop
    print('{3}: Calculated {1}{2} for {0}.  sdf has {4} columns.'.format(field, func1, func2, dt.datetime.now(), len(sdf.columns)))
    return sdf
    

In [24]:
# create features for each of the fields.  zero out the feature for positions that it's not relevant.
for f, p in zip(fields_to_use, for_posn):
    #sdf = feature_creation(f, 'min', 'max', p)
    sdf = feature_creation(f, 'mean', 'std', p, sdf)

2018-12-28 20:14:29.347757: Calculated meanstd for 3Y-Passing-Att.  sdf has 20 columns.
2018-12-28 20:14:29.448874: Calculated meanstd for 3Y-Rushing-Att.  sdf has 23 columns.
2018-12-28 20:14:29.590848: Calculated meanstd for 3Y-Receiving-Tgt.  sdf has 26 columns.
2018-12-28 20:14:29.821536: Calculated meanstd for 3Y-Fantasy-FantPt.  sdf has 29 columns.
2018-12-28 20:14:30.197260: Calculated meanstd for 3Y-FP_over.  sdf has 32 columns.
2018-12-28 20:14:30.884424: Calculated meanstd for 3Y-Scrm_Plys.  sdf has 35 columns.
2018-12-28 20:14:32.139416: Calculated meanstd for 3Y-1st/G.  sdf has 38 columns.
2018-12-28 20:14:34.527292: Calculated meanstd for 3Y-3rd_Pct.  sdf has 41 columns.
2018-12-28 20:14:39.236741: Calculated meanstd for 3Y-TO.  sdf has 44 columns.
2018-12-28 20:14:48.713739: Calculated meanstd for player_attempts.  sdf has 47 columns.
2018-12-28 20:15:07.234124: Calculated meanstd for Fantasy-FantPt.  sdf has 50 columns.
2018-12-28 20:15:43.822874: Calculated meanstd for 

Interestingly, this operation gets very slow after a few iterations.  There is some inefficiency here, maybe around continually createOrReplaceTempView the sdf?  There are some instances where I have gotten an out of memory error or the spark session just ends.

I had to run restart the kernel and run just these lines of code specifically.

In [26]:
try:
    sdf = sdf.drop(sdf['_c0'])  # joining the table in iterations (which is what I did at first) produces this bogus field
except:
    pass

In [25]:
sdf.toPandas().to_csv(r'pfr_nfl_w_features_2.csv')

In [27]:
# Save a version before the features get encoded
sdf.write.parquet("ff_df_encoded_normalized.parquet", mode='overwrite')

## Prepare the pipeline to encode data

In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [12]:
#Load sdf from parquet file.  Start from this point if spark gets shut down, or you get an out of memory error, etc
sdf = spark.read.parquet('ff_df_encoded_normalized.parquet')
sdf.createOrReplaceTempView("sdf")

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [14]:
# if you have to restart the pipeline
try:
    del(stages)
except:
    pass

In [15]:
# categoricalColumns = ["Player", "Tm", "FantPos"]
categoricalColumns = ["FantPos"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoder(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoder(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [16]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="player_tier", outputCol="label")
stages += [label_stringIdx]

Get all features into a features vector. <br>
For the first pass, I had used receiving, rushing, and passing atttempts from prior years <br>
For the second pass, I also added a fantasy point difference over the next closest player from prior years <br>

In [17]:
# Transform all features into a vector using VectorAssembler
numericCols = ['3Y-Passing-Att','3Y-Rushing-Att','3Y-Receiving-Tgt','3Y-Fantasy-FantPt','3Y-Scrm_Plys','3Y-1st/G','3Y-3rd_Pct','3Y-TO','Age_into_season',
               '3Y-Passing-Att_zscore','3Y-Rushing-Att_zscore','3Y-Receiving-Tgt_zscore','3Y-Fantasy-FantPt_zscore','3Y-FP_over_zscore','3Y-Scrm_Plys_zscore','3Y-1st/G_zscore',
               '3Y-3rd_Pct_zscore','3Y-TO_zscore','player_attempts_zscore','Fantasy-FantPt_zscore','Age_into_season_zscore']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [18]:
partialPipeline = Pipeline().setStages(stages)

partialPipeline.getStages()

[StringIndexer_4ae5a60d50019d8476b7,
 OneHotEncoder_4a71af2e4cd0a36f0600,
 StringIndexer_4320996c4b7483e6a408,
 VectorAssembler_4060bb4e7031c7d91c5c]

In [19]:
pipelineModel = partialPipeline.fit(sdf)

In [20]:
preppedDataDF = pipelineModel.transform(sdf)
preppedDataDF.createOrReplaceTempView("preppedDataDF")

In [21]:
preppedDataDF.show()

+--------------------+-------------------+-------------------+------------------+--------------------+-------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+----------------------+---------------------+---------------------+--------------------+-------------------+------------------+--------------------+------------------+----+----------------+---+-------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----+-----------+---------------+--------------+---------------+---------------------+---------------------+-----------------------+------------------------+--------------------+--------------------+--------------------+-------------------+-------------------+----------------------+---------------------+----------------------+------------+---------------+-----+-----------------

In [22]:
# Save prepared dataframe
preppedDataDF.write.parquet("preppedDataDF.parquet", mode='overwrite')