In [14]:
########## 1. Load Packages
import warnings
warnings.simplefilter("ignore")

import importlib
import pandas as pd
import numpy as np
import re
from io import StringIO
import itertools
import os 
import time
import datetime

from io import StringIO # python3; python2: BytesIO 
import boto3

import emoji
import random 
import math

from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

########## 2. Set Parameters 

# Indicate how many rows to skip before columns
# Note: Python uses zero-based indexing, so skiprow=0 begins at the first row of file,
# while skiprow=1 begins at the second row.
skiprow=0

# Indicate name of column that contains text data for analysis
text_column = "text"

filepath = "data/"

import_bucket = "joe-exotic-2020"

key = 'processed' # already created on S3
csv_buffer = StringIO()
s3_resource = boto3.resource('s3')
s3 = boto3.client('s3')

results_bucket = 'full_clean' # already created on S3

######### 3. Import Data from S3 bucket
# Import Flattened Data
filelist = [os.path.join(obj.bucket_name, obj.key) 
    for obj in s3_resource.Bucket(name=import_bucket).objects.all() 
    if re.findall("processed.csv",obj.key)]

filelist = filelist[:2]

def import_data(filelist):
    '''Read in data from excel files into Pandas dataframe. Concatenates multiple files if necessary. 
    Inputs: Directory path, number of rows to skip
    Outputs: Pandas dataframe containing imported data
    '''
    # Identify if directory or file path was provided
    if type(filelist) == list:
        dataframes = []
        # Iterate through files of the directory
        for filename in filelist:
            object_key = filename.split('/', 1)[1]
            csv_obj = s3.get_object(Bucket=import_bucket, Key=object_key)
            body = csv_obj['Body']
            csv_string = body.read().decode('utf-8')
            dataframe = pd.read_csv(StringIO(csv_string))
            #Subset vars - Keep only those that are relevant. 
            dataframe = dataframe[['created_at', 'text', 'source', 'retweet_count', 'favorite_count', 'lang', 'possibly_sensitive',
                      'withheld_in_countries', 'place.country', 'quoted_status_id', 'user.created_at',
                      'user.description', 'user.favourites_count', 'user.followers_count', 'user.friends_count',
                       'user.geo_enabled', 'user.has_extended_profile', 'user.lang', 'user.listed_count',
                       'user.location', 'user.name', 'user.protected', 'user.screen_name',
                       'user.time_zone', 'user.verified', 'user.protected', 'user.default_profile',
                       'is_quote_status', 'quoted_status.user.followers_count', 'quoted_status.user.friends_count',
                       'retweeted_status.user.followers_count', 'retweeted_status.user.friends_count', 'user.url',
                       'in_reply_to_status_id', 'id_str', 'user.id', 'suspended', 'user.statuses_count', 'id']]
            dataframes.append(dataframe)
        df = pd.concat(dataframes, ignore_index=True, sort=False) # axis=0?
    else:
        # Read in single file
        object_key = filelist.rsplit('/', 1)[1]
        csv_obj = s3.get_object(Bucket=import_bucket, Key=object_key)
        body = csv_obj['Body']
        csv_string = body.read().decode('utf-8')
        df = pd.read_csv(StringIO(csv_string))
    rows = len(df)
    dups = len(df) - len(df.drop_duplicates())
    
    # Check for text_column
    try:
        if len(df[text_column]) > 1:  
            pass
    except:
        print("Cannot find text column. Please confirm that the text_column and skiprow parameters are updated.")
    
    # Clean up rows and drop duplicates based on tweet id string. 
    df = df.drop_duplicates(subset=['id_str'])
    df = df[df['created_at'] != "False"]
    df = df[df['created_at'] != "created_at"]
    #df['user.id'] = df['user.id'].astype('float')
    df['user.id'] = pd.to_numeric(df['user.id'], errors='coerce')
    df = df[df['user.id'].isnull() != True]
    
    # Format dates 
    df["user.created_at"] = pd.to_datetime(df["user.created_at"], format='%a %b %d %H:%M:%S %z %Y')
    df["created_at"] = pd.to_datetime(df["created_at"], format='%Y-%m-%d %H:%M:%S').dt.tz_localize('UTC')

    return df

df = import_data(filelist)

######## 4. Split into Train vs. Valid vs. Test (Note particular method due to panel data)
#- Test: 20%
#- Train: 60%
#- Validation: 20%

# https://towardsdatascience.com/assigning-panel-data-to-training-testing-and-validation-groups-for-machine-learning-models-7017350ab86e
# Here is a few lines of python code to that ensure that your training, testing and validation groups are independent.
# Get a Unique List of All IDs (machines).
pd_id=df.drop_duplicates(subset='user.id')
pd_id=pd_id[['user.id']]

# Create a new variable with a random number between 0 and .
np.random.seed(42)
pd_id['wookie'] = (np.random.randint(0, 10000, pd_id.shape[0]))/10000
pd_id=pd_id[['user.id', 'wookie']]

#Give each machine a 20% chance of being in the validation, 
#a 20% chance of being in the testing and a 
# 60% chance of being in the training data set.
# Split into Train vs. Valid vs. Test
#- Test: 20%
#- Train: 60%
#- Validation: 20%
pd_id['MODELING_GROUP'] = np.where(((pd_id.wookie <= 0.60)), 'TRAINING', np.where(((pd_id.wookie <= 0.80)), 'VALIDATION', 'TESTING'))

tips_summed = pd_id.groupby(['MODELING_GROUP'])['wookie'].count()

# Append the Group of each id to each individual record.
df=df.sort_values(by=['user.id'], ascending=[True])

pd_id=pd_id.sort_values(by=['user.id'], ascending=[True])
df = df.merge(pd_id, on=['user.id'], how='inner')

#Train
X_train = df[df['MODELING_GROUP'] == 'TRAINING']
X_train = X_train.drop(['wookie', 'MODELING_GROUP'], axis=1)
y_train = df[df['MODELING_GROUP'] == 'TRAINING'].suspended

# Validation
X_valid = df[df['MODELING_GROUP'] == 'VALIDATION']
X_valid = X_valid.drop(['wookie', 'MODELING_GROUP'], axis=1)
y_valid = df[df['MODELING_GROUP'] == 'VALIDATION'].suspended

# Test
X_test = df[df['MODELING_GROUP'] == 'TESTING']
X_test = X_test.drop(['wookie', 'MODELING_GROUP'], axis=1)
y_test = df[df['MODELING_GROUP'] == 'TESTING'].suspended

########## 5. Preprocessing pipeline

# Create functions for pipeline
def preprocess_text(sen):
    # Removing html tags
    sentence = remove_tags(sen)

    # Remove punctuations and numbers
    sentence = re.sub('[^a-zA-Z]', ' ', sentence)

    # Single character removal
    sentence = re.sub(r"\s+[a-zA-Z]\s+", ' ', sentence)

    # Removing multiple spaces
    sentence = re.sub(r'\s+', ' ', sentence)

    return sentence

TAG_RE = re.compile(r'<[^>]+>')

def remove_tags(text):
    return TAG_RE.sub('', text)

class CombinedAttributesAdder(BaseEstimator, TransformerMixin):
    def __init__(self, tweets_per_minute = True): # no *args or **kargs
        self.tweets_per_minute = tweets_per_minute
    def fit(self, X, y=None):
        return self  # nothing else to do
    def transform(self, X):
        X_train_2 = X.copy()
        # User Age (tweet created_at - account created_at)
        X_train_2["user_age"] = (X_train_2["created_at"] - X_train_2["user.created_at"]).dt.days
        # Tweets per day 
        X_train_2['tweets_per_day'] = X_train_2['user.statuses_count'].astype(float)/X_train_2["user_age"] 
        # Calculate time since last tweet
        X_train_2["since_last_tweet_mins"] = X_train_2.sort_values(['user.id','created_at']).groupby('user.id')['created_at'].diff().dt.seconds.div(60)
        X_train_2 = pd.merge(X_train_2, X_train_2.groupby(['user.id'], sort=False)['since_last_tweet_mins'].min().to_frame('since_last_tweet_mins_min'), on = ["user.id"])
        X_train_2 = pd.merge(X_train_2, X_train_2.groupby(['user.id'], sort=False)['since_last_tweet_mins'].max().to_frame('since_last_tweet_mins_max'), on = ["user.id"])
        X_train_2 = pd.merge(X_train_2, X_train_2.groupby(['user.id'], sort=False)['since_last_tweet_mins'].mean().to_frame('since_last_tweet_mins_mean'), on = ["user.id"])        
        
        X_train_2['date'] = X_train_2['created_at'].dt.date
        X_train_2['hour'] = X_train_2['created_at'].dt.hour
        X_train_2 = pd.merge(X_train_2, X_train_2[['user.id', 'date', 'hour', 'id']].groupby(['user.id', 'date', 'hour']).count().groupby('user.id', sort=False)["id"].mean().reset_index(name ='avg_tweets_per_hr'), on = ["user.id"])
        X_train_2 = pd.merge(X_train_2, X_train_2[['user.id', 'date', 'id']].groupby(['user.id', 'date']).count().groupby('user.id', sort=False)["id"].mean().reset_index(name ='avg_tweets_per_day'), on = ["user.id"])

        X_train_2.loc[X_train_2['quoted_status_id'].notna(), 'quoted_status_id'] = 1
        X_train_2.loc[X_train_2['quoted_status_id'].isna(), 'quoted_status_id'] = 0
        X_train_2['no_hashtags'] = X_train_2['text'].apply(lambda x: len(re.findall(r"#(\w+)", x)))
        X_train_2['no_mentions'] = X_train_2['text'].apply(lambda x: len(re.findall("@(\w{1,15})", x)))
        X_train_2['no_urls'] = X_train_2['text'].apply(lambda x: len(re.findall("(?P<url>https?://[^\s]+)", x)))
        X_train_2['tw_len'] = X_train_2['text'].apply(lambda x: len(x))
        X_train_2['followers_per_followees'] = X_train_2['user.followers_count'].astype('float')/X_train_2['user.friends_count'].astype('float')

        # URLs (percent of tweets with them)
        X_train_2["containsURL"] = (X_train_2['no_urls']  > 0).astype(int)
        url_counts = X_train_2.groupby('user.id').agg({'created_at':'count', 
                         'containsURL':'sum'})
        url_counts['user.urls_per_tweet'] = url_counts['containsURL']/url_counts['created_at']
        X_train_2 = pd.merge(X_train_2, url_counts[['user.urls_per_tweet']], on = ["user.id"])  

        # Hashtags, Mentions, and URLS
        url_counts = X_train_2.groupby('user.id').agg({'created_at':'count', 
                 'no_hashtags':'sum', 'no_mentions':'sum', 'no_urls':'sum'})
        url_counts['no_hashtags_per_tweet'] = url_counts['no_hashtags']/url_counts['created_at']
        url_counts['no_mentions_per_tweet'] = url_counts['no_mentions']/url_counts['created_at']
        url_counts['no_urls_per_tweet'] = url_counts['no_urls']/url_counts['created_at']
        X_train_2 = pd.merge(X_train_2, url_counts[['no_hashtags_per_tweet', 'no_mentions_per_tweet', 'no_urls_per_tweet']], on = ["user.id"])  

        X_train_2['user.followers_count'] = X_train_2['user.followers_count'].astype('float')
        X_train_2['user.friends_count'] = X_train_2['user.friends_count'].astype('float')

        # Pace of follower and friend add-on during collected time period 
        avg_friends_per_day = X_train_2.groupby(['user.id', 'date'], as_index=True).mean()[['user.followers_count', 'user.friends_count']]
        avg_friends_change = avg_friends_per_day.sort_values(['user.id','date']).groupby('user.id').diff().rename(columns={'user.followers_count':'user.followers_countdailychange','user.friends_count' : 'user.friends_countdailychange'})
        X_train_2 = pd.merge(X_train_2, avg_friends_change.groupby(['user.id'], as_index=True)[['user.followers_countdailychange', 'user.friends_countdailychange']].mean(), on = ["user.id"]) 

        # Pace of follower and friend add-on overall 
        X_train_2['user.friend_rate'] = X_train_2['user.friends_count']/X_train_2['user_age']
        X_train_2['user.followers_rate'] = X_train_2['user.followers_count']/X_train_2['user_age']

        X_train_2['user.has_url'] = (X_train_2['user.url'].fillna(False) != False).astype(int)
        X_train_2['user.has_location'] = (X_train_2['user.location'].fillna(False) != False).astype(int)
        X_train_2['user.screen_name.digit_length'] = X_train_2['user.screen_name'].apply(lambda x: len(re.sub("[^0-9]", "", x)) if pd.notnull(x) else x)
        X_train_2['user.screen_name.length'] = X_train_2['user.screen_name'].apply(lambda x: len(x) if pd.notnull(x) else x)

        # Convert emojis to words in tweet
        X_train_2['text'] = X_train_2['text'].apply(lambda x: emoji.demojize(x, delimiters=("", " ")) if pd.notnull(x) else x)
        X_train_2['text'] = X_train_2['text'].apply(lambda x: preprocess_text(x) if pd.notnull(x) else x)
        X_train_2['text'] = X_train_2['text'].apply(lambda x: remove_tags(x) if pd.notnull(x) else x)

        # Convert emojis to words in bio 
        X_train_2['user.description'] = X_train_2['user.description'].apply(lambda x: emoji.demojize(x, delimiters=("", " ")) if pd.notnull(x) else x)

        # Convert emojis to words in name
        X_train_2['user.name'] = X_train_2['user.name'].apply(lambda x: emoji.demojize(x, delimiters=("", " ")) if pd.notnull(x) else x)

        # Create binary for whether it is reply or not 
        X_train_2['is_reply'] = (X_train_2['in_reply_to_status_id'].fillna(False) != False).astype(int)

        X_train_2 = X_train_2[['id','created_at', 'text', 'source', 'retweet_count', 'favorite_count', 'lang', 'possibly_sensitive',
                              'withheld_in_countries', 'place.country', 'quoted_status_id', 'user.id', 'user.created_at',
                              'user.description', 'user.favourites_count', 'user.followers_count', 'user.friends_count',
                               'user.geo_enabled', 'user.has_extended_profile', 'user.lang', 'user.listed_count',
                               'user.location', 'user.name', 'user.protected', 'user.screen_name', 'user.statuses_count',
                               'user.time_zone', 'user.verified', 'user.protected', 'user.default_profile',
                               'is_quote_status', 'quoted_status.user.followers_count', 'quoted_status.user.friends_count',
                               'retweeted_status.user.followers_count', 'retweeted_status.user.friends_count', 'user_age', 
                              'tweets_per_day', 'since_last_tweet_mins', 'since_last_tweet_mins_min',
                               'since_last_tweet_mins_max', 'since_last_tweet_mins_mean', 'avg_tweets_per_hr', 
                               'avg_tweets_per_day', 'no_hashtags', 'no_mentions', 'no_urls', 'tw_len',
                              'followers_per_followees', 'containsURL',  'user.urls_per_tweet', 'no_hashtags_per_tweet',
                              'no_mentions_per_tweet', 'no_urls_per_tweet', 'user.followers_countdailychange', 
                               'user.friends_countdailychange', 'user.friend_rate', 'user.followers_rate',
                              'user.has_url', 'user.has_location', 'user.screen_name.digit_length', 
                               'user.screen_name.length', 'is_reply', 'suspended']]
        return X_train_2
    
####### 6. Implement Pipeline and Save Test, Train, Valid Files to S3 bucket
test_pipeline = Pipeline([
        #('imputer', SimpleImputer(strategy="median")),
        ('attribs_adder', CombinedAttributesAdder())
        #('std_scaler', StandardScaler()), # feature scaling
    ])

# Train
X_train_tr = test_pipeline.fit_transform(X_train)

X_train_tr.to_csv(csv_buffer, index=False, encoding = "utf_8_sig")
s3_resource.Object(import_bucket, results_bucket + '/' + "x_train.csv").put(Body=csv_buffer.getvalue())

y_train.to_csv(csv_buffer, index=False, encoding = "utf_8_sig")
s3_resource.Object(import_bucket, results_bucket + '/' + "y_train.csv").put(Body=csv_buffer.getvalue())

# Test
X_train_tr = test_pipeline.fit_transform(X_test)
X_train_tr.to_csv(csv_buffer, index=False, encoding = "utf_8_sig")
s3_resource.Object(import_bucket, results_bucket + '/' + "x_test.csv").put(Body=csv_buffer.getvalue())

y_test.to_csv(csv_buffer, index=False, encoding = "utf_8_sig")
s3_resource.Object(import_bucket, results_bucket + '/' + "y_test.csv").put(Body=csv_buffer.getvalue())

# Valid 
X_train_tr = test_pipeline.fit_transform(X_valid)
X_train_tr.to_csv(csv_buffer, index=False, encoding = "utf_8_sig")
s3_resource.Object(import_bucket, results_bucket + '/' + "x_validation.csv").put(Body=csv_buffer.getvalue())

y_valid.to_csv(csv_buffer, index=False, encoding = "utf_8_sig")
s3_resource.Object(import_bucket, results_bucket + '/' + "y_validation.csv").put(Body=csv_buffer.getvalue())