# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import datetime as dt
import boto3
import json
from datetime import datetime
from datetime import date
import re
import string


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

Stocktwtis is a platform for active traders to exchange ideas. People post messages and the community can engage by liking or replying to the messages, or they can follow users. In order to cultivate a healthy community, we think it's pivotal to recommend the right people for users to follow. 

The recommendation process consists of the following steps.
1. Find 50 people that a user is closely engaged with in the last 3 months.
2. Calculate user reputation score based on an NLP model trained on the text of posted messages.
3. Find second-degre  closest connections (i.e. closest connections of the 50 people in step 1), then only recommend those with good reputation scores. 

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from?What type of information is included? 

For step 1, we have user activities stored in csv files, such as follow, reply, like and mention.
For step 2, messages are in json format along with their metadata.


In [3]:
# Read in the data here
LAST_DATE = dt.datetime.now().date().strftime("%Y%m%d")
CURRENT_TIME = dt.datetime.now().strftime('%s')

S3_BUCKET = 'ds-data-store'
S3_PATH = 'graph_data/' + LAST_DATE
S3_CLIENT = boto3.client('s3')

def read_from_s3(filename):
    return S3_CLIENT.get_object(Bucket=S3_BUCKET, Key=S3_PATH + '/{:s}'.format(filename))

follow = pd.read_csv(read_from_s3('follow.csv')['Body'], header=None, nrows=3, names=['user_id', 'engaged_with_user_id', 'timestamp'])
reply = pd.read_csv(read_from_s3('reply.csv')['Body'], header=None, nrows=3,
                       names=['user_id', 'engaged_with_user_id', 'message_id', 'timestamp','rel_type'], usecols=[0, 1, 3])
mention = pd.read_csv(read_from_s3('mention.csv')['Body'], header=None, nrows=3,
                         names=['user_id', 'engaged_with_user_id', 'message_id', 'timestamp', 'rel_type'], usecols=[0, 1, 3])
like = pd.read_csv(read_from_s3('like.csv')['Body'], header=None, nrows=3, names=['user_id', 'engaged_with_user_id', 'message_id', 'timestamp', 'rel_type'],
                      usecols=[0, 1, 3])
user = pd.read_csv(read_from_s3('user.csv')['Body'], header=None, nrows=3, usecols=[0, 11], names=['user_id', 'suspended'], dtype={'user_id':str})

In [4]:
for df in [follow, reply, mention, like, user]:
    print('\n', df)


    user_id  engaged_with_user_id            timestamp
0        1                     3  2009-07-10 22:24:37
1        2                     3  2009-07-10 22:27:55
2        6                     3  2009-07-12 04:51:15

    user_id  engaged_with_user_id            timestamp
0   434018                318908  2019-07-03 00:00:00
1  2010747               1148584  2019-07-03 00:00:01
2   970851               1474180  2019-07-03 00:00:01

    user_id  engaged_with_user_id            timestamp
0   970851                795541  2019-07-03 00:00:01
1  1131490               1875300  2019-07-03 00:00:05
2  1248076                115915  2019-07-03 00:00:09

    user_id  engaged_with_user_id            timestamp
0  1119436               1616220  2019-07-03 00:00:00
1  1690933               2152331  2019-07-03 00:00:02
2  2141226               2118816  2019-07-03 00:00:03

   user_id  suspended
0       1          0
1       2          0
2       3          0


In [8]:
# fire hose json data

In [5]:
def read_file(f):
    twits = []
    for line in open(f, encoding="utf-8"):
        twits.append(json.loads(line))
    return twits

msg_data = read_file('st_messages.data')

In [6]:
msg_data[0]

{'id': 46555952,
 'body': '@BEG007 if u sell and buy back right away then they can put u on free ride violation',
 'created_at': '2015-12-09T21:30:39Z',
 'source': {'id': 1149,
  'title': 'StockTwits for iOS',
  'url': 'http://www.stocktwits.com/mobile'},
 'conversation': {'parent_message_id': 46550884,
  'in_reply_to_message_id': 46555907,
  'parent': False,
  'replies': 5},
 'reshares': {'reshared_count': 0, 'user_ids': []},
 'entities': {'sentiment': None},
 'user_id': 569057,
 'recommended': False}

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [8]:
# Performing cleaning tasks here

# 1. Clean messages using reg exp. 
# 2. Aggregate messages by user and by week.

def msg_level_df(twits):
    """ 
    Convert raw data to data frame. 
    """ 
    fmt = '%Y-%m-%dT%H:%M:%SZ'

    user = []
    recommend = []
    body = []
    calendar_date = []
    link_desc = []

    for twit in twits:
        user.append(twit['user_id'])
        body.append(twit['body'])
        calendar_date.append(datetime.date(datetime.strptime(twit['created_at'], fmt)))

        desc = ''
        if 'links' in twit:
            if 'description' in twit['links'][0]:
                if(twit['links'][0]['description']!=None):
                    desc += twit['links'][0]['description'] 
        link_desc.append(desc)

    df = pd.DataFrame({'user':user, 'calendar_date':calendar_date,'body':body, 'link_desc':link_desc})

    # Create weekly bins for aggregating messages
    df['daydiff'] = df['calendar_date'].apply(lambda x: (x- date(2015,12,8)).days)
    bins = range(0, 190, 7)
    df['date_range'] = pd.cut(df['daydiff'], bins)
        
    return df

def clean_twits(s):
    """
    Clean the body of messages with regex.
    """
    regex_user = re.compile('\@\w+')
    regex_link = re.compile('https?:\/\/[^\s]+')
    regex_punctuation = re.compile('[{}]'.format(''.join(['\\'+p for p in string.punctuation])))
    regex_nonAscii = re.compile('[^\x00-\x7F]')
    regex_number = re.compile('\d+')
    
    s = re.sub(regex_user, '', s)  
    s = re.sub(regex_link, 'http', s)    
    s = re.sub(regex_punctuation, '', s)    
    s = re.sub(regex_nonAscii, '', s)
    s = re.sub(regex_number, '', s) 
    return s.lower()

def text_join(x):
    return ' '.join(x)

def weekly_level_df(df):
    """
    Aggregate messages by user and week.
    """
    txt_df = df[['user','date_range','body','link_desc']].copy()
    txt_df['clean_body'] = txt_df['body'].apply(clean_twits)
    txt_df['clean_link_desc'] = txt_df['link_desc'].apply(clean_twits)
    txt_weekly = txt_df.groupby(['user', 'date_range']).agg({'clean_body':text_join, 'clean_link_desc':text_join}).reset_index()
        
    return txt_weekly

msg_df = msg_level_df(msg_data)
msg_weekly = weekly_level_df(msg_df)

In [9]:
msg_weekly.head()

Unnamed: 0,user,date_range,clean_body,clean_link_desc
0,39,"(0, 7]",why do you have a picture of unabomber on your...,why do you have a picture of the unabomber on ...
1,39,"(7, 14]",clf approaching low video http stock mkt anal...,be on the lookout for a free webinar lessons ...
2,39,"(14, 21]",very cool end of year video from http video h...,we look back at the major events of that inve...
3,39,"(21, 28]",ytd vwaps spx spy day moving average on ...,safeguarding your email address and webinar ...
4,39,"(28, 35]",no trust for spy action but it is interesting ...,stock market analysis education webinars ...


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

In [11]:
# Data Model for find_closest_connectios - postgresql table
# The closest_connections table has the top 50 users that a user most closely engaged with in the last three months.


# Data Model for calc_reputation_score - csv file
# The csv file has two columns: user_id and score for each user.


# Data Model for find_user_rec - postgresql tables
# 1. The recommnedations table contains the recommended users.
# 2. For every recommedation, we also store why it is made by looking at each closest connection's contribution. 
#    The rec_reasons table has that info.  

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Find closest connections from the user engagement files.
2. Calculate user reputation scores based on trained model.
3. Find second-degree closest connections and make recommendations by combining reputation scores and user weights.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here
# Data Model for find_closest_connectios - postgresql table
"""
    CREATE TABLE graph_recs.closest_connections (
    user_id             INT    NOT NULL,
    closest_connection  INT    NOT NULL,
    weight              Decimal     NOT NULL,
    created_at          TIMESTAMPTZ NOT NULL,
    updated_at          TIMESTAMPTZ NOT NULL,    
    PRIMARY KEY         (user_id, closest_connection)
    );
"""  

# Data Model for find_user_rec - postgresql tables
"""
    CREATE TABLE graph_recs.recommendations (
    user_id             INT    NOT NULL,
    rec_id              INT    NOT NULL,
    created_at          TIMESTAMPTZ NOT NULL,
    updated_at          TIMESTAMPTZ NOT NULL,    
    PRIMARY KEY         (user_id, rec_id)
    );å
    
    CREATE TABLE graph_recs.rec_reasons (
    user_id             INT    NOT NULL,
    rec_id              INT    NOT NULL,
    reason_id           INT    NOT NULL,
    weight              Decimal    NOT NULL,
    created_at          TIMESTAMPTZ NOT NULL,
    updated_at          TIMESTAMPTZ NOT NULL,    
    PRIMARY KEY         (user_id, rec_id, reason_id)
    );    
"""  

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
# Two data quality checks are performed
# 1. make sure there is no null data in the results
# 2. make sure all results are updated in the last 24 hours.

run_quality_checks = DataQualityOperator(
    task_id='run_data_quality_checks',
    redshift_conn_id='redshift',
    test_cases=[
        # no null data
        ("SELECT COUNT(user_id) from graph_recs.closest_connections WHERE user_id IS NULL OR rec_id IS NULL", 0),
        ("SELECT COUNT(user_id) from graph_recs.recommendations WHERE user_id IS NULL OR rec_id IS NULL", 0),
        ("SELECT COUNT(user_id) from graph_recs.user_rec_reasons WHERE user_id IS NULL OR rec_id IS NULL", 0),

        # no expired results
        ("SELECT COUNT(user_id) from graph_recs.closest_connections WHERE updated_at < now()- interval \'1 day\'", 0),       
        ("SELECT COUNT(user_id) from graph_recs.user_rec WHERE updated_at < now()- interval \'1 day\'", 0),   
        ("SELECT COUNT(user_id) from graph_recs.user_rec_reasons WHERE updated_at < now()- interval \'1 day\'", 0),              
    ],
    dag=dag
)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

table: closest_connection
fields:
  - name: user_id
    description: id of the user
    source: backend database
    
  - name: closest_connection
    description: a user that user_id is closely engaged with
    source: step 1 of the pipeline
    
  - name: weight
    description: a time-weighted engagement frequency
    source: step 1 of the pipeline
    
  - name: created_at 
    description: the timestamp that the record was initially created
    source: step 1 of the pipeline
    
  - name: updated_at
    description: the timestamp that the record was updated
    source: find_closest_connections   
    
csv file: reputation_score
fields:
  - name: user_id
    description: id of the user
    source: backend database
    
  - name: pred
    description: a number between 0 and 1 (1 being the highest), indicating how similar that a user is to Stocktwits handpicked good users in terms of choice of words and information quality.
    source: calc_reputation_scores
    
table: recommendations
description: user recomendations
fields: 
  - name: user_id
    description: id of the user who gets the recommendations
    source: backend database
    
  - name: rec_id
    description: id of the user being recommended based on 2nd degree connections of the user and reputation score.   
    source: step 3 of the pipeline

  - name: created_at 
    description: the timestamp that the record was initially created
    source: step 3 of the pipeline
    
  - name: updated_at
    description: the timestamp that the record was updated
    source: find_user_rec
     
table: rec_reasons
description: reasons for why a user is recommended
fields: 
  - name: user_id
    description: id of the user who gets the recommendations
    source: backend database
    
  - name: rec_id
    description: id of the user being recommended based on 2nd degree connections of the user and reputation score.   
    source: step 3 of the pipeline

  - name: reason_id
    description: id of a user who is a closest connection of user_id that leads to the recommendation of rec_id
    source: step 3 of the pipeline
    
  - name: weight
    description: contribution of reason_id to the recommendation of rec_id
    source: step 3 of the pipeline
    
  - name: created_at 
    description: the timestamp that the record was initially created
    source: step 3 of the pipeline
    
  - name: updated_at
    description: the timestamp that the record was updated
    source: find_user_rec
    

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [1]:
# Please refer to readme for the write up.