### Matching MLB Player Stats with Weekly Twitter Sentiment

In [26]:
from ftfy import fix_text
from textblob import TextBlob
import pandas as pd
from collections import defaultdict
import json
import os
import sys
from pprint import pprint

### Tabular MLB Stats data Spark

In [19]:
# Start up spark context
from pyspark import SparkContext
sc.stop()
sc = SparkContext()

In [79]:
#Grab draftkings and MLB data
data_pitchers = pd.read_csv("../Data_Engineering_Project/data/pitchers_2016_06_26.csv")
data_batters = pd.read_csv("../Data_Engineering_Project/data/batters_2016_06_26.csv")
data_DK = pd.read_csv("../Data_Engineering_Project/data/DKSalaries_2016_06_26.csv")

#Grab the player's names as unique ID
pitchers_names = data_pitchers["Player Name"]
batters_names = data_batters["Player Name"]
DK_names = data_DK["Name"]


In [353]:
data_pitchers.head()

Unnamed: 0.1,Unnamed: 0,Player Name,Team,Games,Innings Pitched,Starts,Quality Starts,Quality Start %,K/BB,K/9IP,BB/9IP,HR/9IP,Batting Avg on Balls IP,Strand Rate,FB VLO,ERA,Fielding Ind Pitching
0,1,Clayton Kershaw,LAD,15,115.0,15,14,93.3,20.14,11.03,0.55,0.47,0.259,80.3,93.0,1.57,1.61
1,2,Johnny Cueto,SF,15,109.3,15,13,86.7,4.57,7.9,1.73,0.25,0.283,79.0,91.4,2.06,2.4
2,3,Madison Bumgarner,SF,16,108.3,16,14,87.5,4.21,10.14,2.41,0.83,0.279,86.0,90.9,1.99,2.98
3,4,Zack Greinke,ARI,16,107.3,16,10,62.5,4.24,7.46,1.76,1.01,0.296,72.1,91.3,3.61,3.6
4,5,Max Scherzer,WAS,16,107.3,16,11,68.8,4.93,11.57,2.35,1.68,0.264,74.7,94.2,3.52,3.86


In [355]:
data_pitchers.columns

Index([u'Unnamed: 0', u'Player Name', u'Team', u'Games', u'Innings Pitched',
       u'Starts', u'Quality Starts', u'Quality Start %', u'K/BB', u'K/9IP',
       u'BB/9IP', u'HR/9IP', u'Batting Avg on Balls IP', u'Strand Rate',
       u'FB VLO', u'ERA', u'Fielding Ind Pitching'],
      dtype='object')

In [356]:
data_pitchers = data_pitchers.drop("Unnamed: 0", 1)

In [None]:
data_pitchers.to_csv("../Data_Engineering_Project/data/batters_2016_06_26_clean.csv", sep=',')

In [76]:
##To clean the batters and pitchers keys
def clean_key(df):
    lst=[]
    for i in df:
        test = i.split(",")
        test[0],test[1] = test[1],test[0]
        test = test[0].strip() + " " + test[1]
        lst.append(test)
    return lst

In [80]:
pitchers_lst = clean_key(data_pitchers['Player Name'])

In [81]:
data_pitchers['Player Name'] = pitchers_lst

In [82]:
data_pitchers.head(3)

Unnamed: 0.1,Unnamed: 0,Player Name,Team,Games,Innings Pitched,Starts,Quality Starts,Quality Start %,K/BB,K/9IP,BB/9IP,HR/9IP,Batting Avg on Balls IP,Strand Rate,FB VLO,ERA,Fielding Ind Pitching
0,1,Clayton Kershaw,LAD,15,115.0,15,14,93.3,20.14,11.03,0.55,0.47,0.259,80.3,93.0,1.57,1.61
1,2,Johnny Cueto,SF,15,109.3,15,13,86.7,4.57,7.9,1.73,0.25,0.283,79.0,91.4,2.06,2.4
2,3,Madison Bumgarner,SF,16,108.3,16,14,87.5,4.21,10.14,2.41,0.83,0.279,86.0,90.9,1.99,2.98


In [83]:
#clean batters
batters_lst = clean_key(data_batters["Player Name"])
data_batters['Player Name'] = batters_lst

In [84]:
data_batters.head(3)

Unnamed: 0.1,Unnamed: 0,Player Name,Team,Position,Games,ABs,Pitches Per AB,Bunts,Hit into DP,Intentional Walks,Walks,SOs,Walk Rate,W2SO Ratio,Contact Rate,Stolen Base Opportunity,Batting Average,Batting Average on BIP
0,1,Mookie Betts,BOS,OF,74,326,3.72,5,6,0,21,51,6.1,0.41,84.4,16.5,0.291,0.305
1,2,Alcides Escobar,KC,SS,73,314,3.37,18,8,0,10,48,3.1,0.21,84.7,18.7,0.252,0.294
2,3,Xander Bogaerts,BOS,SS,73,310,3.8,2,6,0,26,50,7.7,0.52,83.9,11.7,0.345,0.39


In [357]:
#cleaned up data to load into postgress
data_pitchers = data_pitchers.drop("Unnamed: 0", 1)
data_batters = data_batters.drop("Unnamed: 0", 1)
data_pitchers.to_csv("../Data_Engineering_Project/data/pitchers_2016_06_26_clean.csv", sep=',')
data_batters.to_csv("../Data_Engineering_Project/data/batters_2016_06_26_clean.csv", sep=',')

In [85]:
#turn into spark dataframe for ease of joining later
sqlCtx = SQLContext(sc)
batters = sqlCtx.createDataFrame(data_batters)
pitchers = sqlCtx.createDataFrame(data_pitchers)
DraftKings = sqlCtx.createDataFrame(data_DK)


In [87]:
DraftKings.take(3)

[Row(Position=u'SP', Name=u'Clayton Kershaw', Salary=14200, GameInfo=u'LAD@Pit 08:08PM ET', AvgPointsPerGame=30.138, teamAbbrev=u'LAD'),
 Row(Position=u'SP', Name=u'Madison Bumgarner', Salary=13900, GameInfo=u'Phi@SF 04:05PM ET', AvgPointsPerGame=25.522, teamAbbrev=u'SF'),
 Row(Position=u'SP', Name=u'Johnny Cueto', Salary=13600, GameInfo=u'Phi@SF 04:05PM ET', AvgPointsPerGame=25.113000000000003, teamAbbrev=u'SF')]

### Streaming Data using Spark

In [231]:
#lets get the captured tweets locally
with open('../Data_Engineering_Project/twitter_player_data.json') as data_file:    
    pitcher_tweets = json.load(data_file)

In [259]:
#quick look at the data - 40 tweets per player
pitchers_keys =pitcher_tweets.keys()
pitcher_rdd = sc.parallelize(pitchers_keys)
print "Number of pitchers:", len(pitchers_keys)
print "#"*40
pitcher_rdd.map(lambda x: pitcher_tweets[x]).take(2)

 Number of pitchers: 219
########################################


[[u'RT @ccari_argentina: Carri\xf3, Zuvic y Fernando S\xe1nchez piden que se investigue a Anibal Fern\xe1ndez https://t.co/MfK5vKbT52\n\n\xa1 Dale #RT ! http\u2026',
  u'Would you like to see Anibal Sanchez start against the Blue Jays?',
  u'RT @ccari_argentina: Carri\xf3, Zuvic y Fernando S\xe1nchez piden que se investigue a Anibal Fern\xe1ndez https://t.co/MfK5vKbT52\n\n\xa1 Dale #RT ! http\u2026',
  u'RT @ccari_argentina: Carri\xf3, Zuvic y Fernando S\xe1nchez piden que se investigue a Anibal Fern\xe1ndez https://t.co/MfK5vKbT52\n\n\xa1 Dale #RT ! http\u2026',
  u'RT @ccari_argentina: Carri\xf3, Zuvic y Fernando S\xe1nchez piden que se investigue a Anibal Fern\xe1ndez https://t.co/MfK5vKbT52\n\n\xa1 Dale #RT ! http\u2026',
  u'RT @ccari_argentina: Carri\xf3, Zuvic y Fernando S\xe1nchez piden que se investigue a Anibal Fern\xe1ndez https://t.co/MfK5vKbT52\n\n\xa1 Dale #RT ! http\u2026',
  u'RT @ccari_argentina: Carri\xf3, Zuvic y Fernando S\xe1nchez piden que se investigue a Anibal F

In [177]:
#https://realpython.com/blog/python/twitter-sentiment-python-docker-elasticsearch-kibana/
#turn numerical sentiment into values
def sentiment(data): 
    if data < 0.0:
        return "negative"
    elif data == 0.0:
        return "neutral"
    else:
        return "positive"

In [249]:
#combine tweets for sentiment analysis and add index
def sentiment_tweets(rdd,tweets):
    "Concatenate the tweets, make textblob, apply sentiment analysis, add index"
    return rdd.map(lambda x: "".join(tweets[x]))
    .map(lambda x: TextBlob(fix_text(x)))
    .map(lambda x: x.sentiment.polarity).map(sentiment).zipWithIndex()

In [260]:
#build pitcher sentiment rdd and sanity check
pitcher_sentiment_rdd = sentiment_tweets(pitcher_rdd, pitcher_tweets)
print pitcher_sentiment_rdd.take(5), "Pitcher count:", pitcher_sentiment_rdd.count()

[('positive', 0), ('positive', 1), ('positive', 2), ('positive', 3), ('negative', 4)] Pitcher count: 219


In [273]:
#create sql context and rename columns
def sqlcntx(rdd,name1,name2):
    return sqlCtx.createDataFrame(rdd).withColumnRenamed('_1', name1).withColumnRenamed('_2',name2)

In [274]:
#create dataframe and sanity check
pitcher_sentiment_df = sqlcntx(pitcher_sentiment_rdd,"Sentiment","Index")
pitcher_sentiment_df.take(5)

[Row(Sentiment=u'positive', Index=0),
 Row(Sentiment=u'positive', Index=1),
 Row(Sentiment=u'positive', Index=2),
 Row(Sentiment=u'positive', Index=3),
 Row(Sentiment=u'negative', Index=4)]

In [262]:
#create index rdd
def key_rdd(tweets):
    "Create an index for each pitcher"
    keys = tweets.keys()
    keys = map(lambda x : str(x),keys)
    return sc.parallelize(keys).zipWithIndex()

In [263]:
pitchers_keys = key_rdd(pitcher_tweets)    
pitchers_keys.take(3)

[('Anibal Sanchez', 0), ('Brandon Maurer', 1), ('Felix Hernandez', 2)]

In [275]:
#rename columns for pitchers
pitchers_key_sprkdf = sqlcntx(pitchers_keys, "Name", "Index")
pitchers_key_sprkdf.take(3)

[Row(Name=u'Anibal Sanchez', Index=0),
 Row(Name=u'Brandon Maurer', Index=1),
 Row(Name=u'Felix Hernandez', Index=2)]

In [323]:
def sprk_df_join(df1,df2,key1,key2,kind = 'inner'):
    "Join two spark dataframes"
    return df1.join(df2,df1[key1]==df2[key2],kind).drop(df2[key2])

In [326]:
pitcher_joined_rdd = sprk_df_join(pitchers_key_sprkdf,pitcher_sentiment_df,"Index","Index")

In [327]:
pitcher_joined_rdd.take(30)

[Row(Name=u'Adam Wainwright', Index=31, Sentiment=u'positive'),
 Row(Name=u'Chad Bettis', Index=32, Sentiment=u'positive'),
 Row(Name=u'Dan Jennings', Index=33, Sentiment=u'positive'),
 Row(Name=u'Zack Greinke', Index=34, Sentiment=u'positive'),
 Row(Name=u'Robbie Ray', Index=35, Sentiment=u'positive'),
 Row(Name=u'Phil Hughes', Index=36, Sentiment=u'positive'),
 Row(Name=u'Joe Blanton', Index=37, Sentiment=u'negative'),
 Row(Name=u'Matt Moore', Index=38, Sentiment=u'positive'),
 Row(Name=u'Drew Pomeranz', Index=39, Sentiment=u'positive'),
 Row(Name=u'Chris Young', Index=40, Sentiment=u'positive'),
 Row(Name=u'Jeremy Hellickson', Index=41, Sentiment=u'positive'),
 Row(Name=u'Cody Anderson', Index=42, Sentiment=u'positive'),
 Row(Name=u'Doug Fister', Index=43, Sentiment=u'positive'),
 Row(Name=u'Clayton Kershaw', Index=44, Sentiment=u'positive'),
 Row(Name=u'Sean Manaea', Index=45, Sentiment=u'negative'),
 Row(Name=u'Nate Jones', Index=46, Sentiment=u'positive'),
 Row(Name=u'Ivan Nova',

In [342]:
pitchers_df_final = pitcher_joined_rdd.toPandas()

In [345]:
#drop the index...dont't need it anymore
pitchers_df_final = pitchers_df_final.drop('Index',1)

In [347]:
pitchers_df_final.head()

Unnamed: 0,Name,Sentiment
0,Adam Wainwright,positive
1,Chad Bettis,positive
2,Dan Jennings,positive
3,Zack Greinke,positive
4,Robbie Ray,positive


In [349]:
#cleaned up data to load into postgress
pitchers_df_final.to_csv("../Data_Engineering_Project/data/pitchers_2016_06_26_sentiment.csv", sep=',', header=False)

In [358]:
#Now for the batters
with open('../Data_Engineering_Project/twitter_batter_data.json') as data_file:    
    batters_tweets = json.load(data_file)
batters_keys =batters_tweets.keys()
batter_rdd = sc.parallelize(batters_keys)    
print "Number of batters:", len(batters_keys)
print "#"*40
#batter_rdd.map(lambda x: batters_tweets[x]).take(2)

Number of batters: 249
########################################


In [359]:
#build batter sentiment rdd and sanity check
batter_sentiment_rdd = sentiment_tweets(batter_rdd, batters_tweets)
print batter_sentiment_rdd.take(5), "Batter count:", batter_sentiment_rdd.count()

[('positive', 0), ('negative', 1), ('positive', 2), ('negative', 3), ('positive', 4)] Batter count: 249


In [360]:
#create dataframe and sanity check
batter_sentiment_df = sqlcntx(batter_sentiment_rdd, "Sentiment","Index")
batter_sentiment_df.take(5)

[Row(Sentiment=u'positive', Index=0),
 Row(Sentiment=u'negative', Index=1),
 Row(Sentiment=u'positive', Index=2),
 Row(Sentiment=u'negative', Index=3),
 Row(Sentiment=u'positive', Index=4)]

In [361]:
batter_keys = key_rdd(batters_tweets)    
batter_keys.take(10)

[('Rougned Odor', 0),
 ('Justin Upton', 1),
 ('Anthony Rendon', 2),
 ('Chris Coghlan', 3),
 ('Brad Miller', 4),
 ('Jason Heyward', 5),
 ('Jeff Francoeur', 6),
 ('Melvin Upton Jr.', 7),
 ('Norichika Aoki', 8),
 ('Ryan Braun', 9)]

In [362]:
#rename columns for batters
batters_key_sprkdf = sqlcntx(batter_keys, "Name", "Index")
batters_key_sprkdf.take(3)

[Row(Name=u'Rougned Odor', Index=0),
 Row(Name=u'Justin Upton', Index=1),
 Row(Name=u'Anthony Rendon', Index=2)]

In [363]:
batter_joined_rdd = sprk_df_join(batters_key_sprkdf,batter_sentiment_df,"Index","Index")

In [364]:
batter_joined_rdd.take(30)

[Row(Name=u'John Jaso', Index=31, Sentiment=u'negative'),
 Row(Name=u'Nick Markakis', Index=231, Sentiment=u'positive'),
 Row(Name=u'Trevor Story', Index=32, Sentiment=u'positive'),
 Row(Name=u'David Ortiz', Index=232, Sentiment=u'positive'),
 Row(Name=u'Manny Machado', Index=33, Sentiment=u'positive'),
 Row(Name=u'Ben Zobrist', Index=233, Sentiment=u'negative'),
 Row(Name=u'Albert Pujols', Index=34, Sentiment=u'positive'),
 Row(Name=u'Todd Frazier', Index=234, Sentiment=u'positive'),
 Row(Name=u'Billy Hamilton', Index=35, Sentiment=u'positive'),
 Row(Name=u'Andrelton Simmons', Index=235, Sentiment=u'positive'),
 Row(Name=u'Ramon Flores', Index=36, Sentiment=u'positive'),
 Row(Name=u'Coco Crisp', Index=236, Sentiment=u'positive'),
 Row(Name=u'Starlin Castro', Index=37, Sentiment=u'positive'),
 Row(Name=u'Brian Dozier', Index=237, Sentiment=u'positive'),
 Row(Name=u'Jimmy Rollins', Index=38, Sentiment=u'positive'),
 Row(Name=u'Martin Prado', Index=238, Sentiment=u'negative'),
 Row(Name=

In [367]:
batters_df_final = batter_joined_rdd.toPandas()

In [368]:
batters_df_final.head()

Unnamed: 0,Name,Index,Sentiment
0,John Jaso,31,negative
1,Nick Markakis,231,positive
2,Trevor Story,32,positive
3,David Ortiz,232,positive
4,Manny Machado,33,positive


In [369]:
batters_df_final = batters_df_final.drop('Index',1)

In [370]:
batters_df_final.head()

Unnamed: 0,Name,Sentiment
0,John Jaso,negative
1,Nick Markakis,positive
2,Trevor Story,positive
3,David Ortiz,positive
4,Manny Machado,positive


In [371]:
#cleaned up data to load into postgress
batters_df_final.to_csv("../Data_Engineering_Project/data/batters_2016_06_26_sentiment.csv", sep=',', header=False)