##Final Code for Influencers Dataframe

In [2]:
ACCESS_KEY = "xxx"
SECRET_KEY = "xxx".replace("/", "%2F")
AWS_BUCKET_NAME = "project4capstones3"
MOUNT_NAME = "twitter_246821242"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

import json
# input = sc.textFile("/mnt/twitter/2017/06/*/*/project*")
data = sqlContext.read.json("/mnt/twitter/2017/06/*/*/project*")

In [3]:
df = data.toPandas()

In [4]:
##### FILTER df with appropriate features #####
features_of_interest = ['user_followers_count', 'user_friends_count', 'user_statuses_count', 'rt_status_retweet_count',
                        'user_listed_count', 'status_num_mentions', 'status_retweet_count', 'user_name', 'user_img_url',
                        'user_id', 'searched_names', 'status_sentMag', 'status_sentScore', 'status_text', 'rt_status_favorite_count']

df_reduced = df[features_of_interest]

# drop missing values
df_reduced = df_reduced.dropna()

In [5]:
##### CONVERT numeric features from string to numeric #####
import pandas as pd

# variables are unicode. Need to change a few of them to numeric data types
numeric_features = ['user_followers_count', 'user_friends_count', 'user_statuses_count', 'rt_status_retweet_count', 
                    'status_num_mentions', 'status_retweet_count', 'user_listed_count', 'status_sentMag', 'status_sentScore', 'rt_status_favorite_count']

# replace None by 0 for numeric features
df_reduced[numeric_features] = df_reduced[numeric_features].replace('None',0)

df_reduced[numeric_features] = df_reduced[numeric_features].convert_objects(convert_numeric=True)

#Influence Score

In [7]:
##### CREATE new features to calculate influence score #####
### REACH ###
# follower_count
# normalize followers_count
df_reduced['user_followers_count'] = [float(i)/max(df_reduced['user_followers_count']) for i in df_reduced['user_followers_count']]

# ratio_followers_following
df_reduced['ratio_followers_following'] = df_reduced['user_followers_count'] /  (df_reduced['user_friends_count']+1) + 10

# reputation
df_reduced['reputation'] = (df_reduced['user_followers_count'] * df_reduced['user_statuses_count']) / (df_reduced['user_friends_count']+1) + 10

# listed_count
df_reduced['listed_count'] = [float(i)/max(df_reduced['user_listed_count']) for i in df_reduced['user_listed_count']]


### RESONANCE #####
# total_count = len(df_reduced)
# mentions_received
df_reduced['mentions_received'] = [float(i)/max(df_reduced['status_num_mentions']) for i in df_reduced['status_num_mentions']]

# retweets_received
df_reduced['retweets_received'] = [float(i)/max(df_reduced['rt_status_retweet_count']) for i in df_reduced['rt_status_retweet_count']]


### ENGAGEMENT ###
# mentions_ratio
df_reduced['mentions_ratio'] = df_reduced['status_num_mentions'] / (df_reduced['user_statuses_count'] +1) +1

# retweets_given_ratio
df_reduced['retweets_given_ratio'] = df_reduced['status_retweet_count'] / (df_reduced['user_statuses_count'] +1) +1

#number_of_tweets
df_reduced['number_of_tweets'] = [float(i)/max(df_reduced['user_statuses_count']) for i in df_reduced['user_statuses_count']]

In [8]:
# give weight to the variables and combine them under 4 categories (Popularity, Activity, Share-ability, Engage-ability)
# the score will be out of 10 and we will have a column "influence_score" averaging the 4 scores
 
df_final = pd.DataFrame()

### Influence Score
df_final['Influencer_Score'] = df_reduced['user_followers_count'] * 0.741877 + df_reduced['ratio_followers_following'] * 0.331026 +\
                               df_reduced['reputation'] * 0.320914 + df_reduced['listed_count'] * 1.000000 +\
                               df_reduced['mentions_received'] * 0.613449 + df_reduced['retweets_received'] * 0.435626 +\
                               df_reduced['number_of_tweets'] * 0.273192 + df_reduced['mentions_ratio'] * 0.189703 +\
                               df_reduced['retweets_given_ratio'] * 0.185087

# Turn influence score into float from 1 to 10
df_final['Influencer_Score'] = [round((float(i)/max(df_final['Influencer_Score'])) * 10,2) for i in df_final['Influencer_Score']] # use 1-10 range for scores

### Sentiment score
df_final['Sentiment_Score'] = df_reduced['status_sentScore']
df_final['Sentiment_Magnitude'] = df_reduced['status_sentMag']

In [9]:
### adding extra columns for Flask app
df_final['User_ID'] = df_reduced['user_id']
df_final['User_name'] = df_reduced['user_name']
df_final['Number_of_Followers'] = df['user_followers_count'] # IMPORTANT: NEED TO USE df['user_followers_count'] here because df_reduced['user_followers_count']
# has been transformed
df_final['Number_of_Retweets'] = df_reduced['rt_status_retweet_count']
df_final['Number_of_Likes'] = df_reduced['rt_status_favorite_count']
df_final['Img_URL'] = df_reduced['user_img_url']
df_final['Searched_Names'] = df_reduced['searched_names']
df_final['Tweets'] = df_reduced['status_text']

In [10]:
df_final = df_final.dropna()

In [11]:
#grouping by user name and searched names. This way, we make sure to not mix the tweets associated with different searched names
# when calculating the influence and  sentiment scores
df_final = df_final.groupby(['User_name', 'Searched_Names']).agg({'Influencer_Score': 'mean',
                                                                  'Img_URL': 'first',
                                                                  'User_ID': 'first',
                                                                  'Tweets': lambda col: ''.join(col),
                                                                  'Sentiment_Score': 'mean',
                                                                  'Sentiment_Magnitude': 'mean', 
                                                                 'Number_of_Followers':'max', 
                                                                 'Number_of_Retweets':'max', 
                                                                 'Number_of_Likes':'max'})

In [12]:
#resetting index
df_final = df_final.reset_index()

In [13]:
##### CREATE labels for influence score (after we have aggregated by User Name and calculated the mean)
# Top influencer if score is >=8, Medium-Tier influencer if score is between 6 & 8, Micro influencer is score is between 4 & 6 , else he/she is not an influencer
for i in range(len(df_final)):
  if df_final.loc[i, 'Influencer_Score'] >= 8:
    df_final.loc[i, 'Influencer_Label'] = 'Top Influencer'
  elif (df_final.loc[i, 'Influencer_Score'] < 8) & (df_final.loc[i, 'Influencer_Score'] >= 6):
    df_final.loc[i, 'Influencer_Label'] = 'Mid-Tier Influencer'
  elif (df_final.loc[i, 'Influencer_Score'] < 6) & (df_final.loc[i, 'Influencer_Score'] >= 3):
    df_final.loc[i, 'Influencer_Label'] = 'Micro Influencer'
  else:
    df_final.loc[i, 'Influencer_Label'] = "Not an Influencer"

In [14]:
##### CREATE labels for sentiment score (after we have aggregated by User Name and calculated the mean)
for i in range(len(df_final)):
  if (df_final.loc[i, 'Sentiment_Score'] > -0.4) & (df_final.loc[i, 'Sentiment_Score'] < 0.4):
    df_final.loc[i, 'Sentiment_Score'] = 'Neutral'
  elif (df_final.loc[i, 'Sentiment_Score'] <= -0.4) & (df_final.loc[i, 'Sentiment_Magnitude'] < 3):
    df_final.loc[i, 'Sentiment_Score'] = 'Negative'
  elif (df_final.loc[i, 'Sentiment_Score'] >= 0.4) & (df_final.loc[i, 'Sentiment_Magnitude'] < 3):
    df_final.loc[i, 'Sentiment_Score'] = 'Positive'
  elif (df_final.loc[i, 'Sentiment_Score'] <= -0.4) & (df_final.loc[i, 'Sentiment_Magnitude'] >= 3):
    df_final.loc[i, 'Sentiment_Score'] = 'Very Negative'
  elif (df_final.loc[i, 'Sentiment_Score'] >= 0.4) & (df_final.loc[i, 'Sentiment_Magnitude'] >= 3):
    df_final.loc[i, 'Sentiment_Score'] = 'Very Positive'

In [15]:
df_final = df_final.drop('Sentiment_Magnitude', 1)

In [16]:
# reordering the columns & sorting by influencer score
df_final = df_final[['Img_URL', 'User_name', 'Number_of_Followers', 'Number_of_Retweets', 'Number_of_Likes', 'Influencer_Score', 'Sentiment_Score', 'Searched_Names', 'Tweets',  'Influencer_Label', 'User_ID']].sort_values('Influencer_Score', ascending = False)

In [17]:
df_final

In [18]:
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import json

In [19]:
sqlCtx = SQLContext(sc)
influencer_df=sqlCtx.createDataFrame(df_final)
type(influencer_df)

In [20]:
influencer_df.write.saveAsTable('influencer_df', mode = 'overwrite')

In [21]:
influencer_df.columns