# Project Title
### Data Engineering Capstone Project

#### Project Summary
Twitter data posted about Amazon, Apple, Google, Microsoft, and Tesla, and market value data of these companies will be used to create a database that is optimized to query and analyze. An ETL pipeline is going to be built to create the database.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import numpy as np
import pandas as pd
import datetime
import psycopg2
from sql_queries import fact_tweet_table_insert, user_table_insert, company_table_insert, value_table_insert, time_table_insert

### Step 1: Scope the Project and Gather Data

#### Scope 
A company's market values are affected by different factors. One of the important factors could be the public opinion about that company. This project ist going to analyze the correlation between the market value of company respect to the public opinion of that company. Particularly, this task will try to understand whether negative tweets about companie affect their market values.

#### Describe and Gather Data

##### Twitter Data
This dataset as a part of the paper published in the 2020 IEEE International Conference on Big Data under the 6th Special Session on Intelligent Data Mining track. The dataset contains over 3 million unique tweets with their information such as tweet id, author of the tweet, post date, the text body of the tweet, and the number of comments, likes, and retweets of tweets matched with the related company.

##### Company Value Data
This dataset contains daily OPEN, CLOSE, VOLUME, HIGH, and LOW values of Amazon, Apple, Google, Microsoft, and Tesla companies as tagged by dates. Values are fetched from the official NASDAQ website.


###### Twitter Dataset

In [2]:
# Read in the data here
path = ""
tweet = pd.read_csv("Tweet.csv")
pd.set_option('display.max_columns', 15)

In [3]:
tweet.head(10)

Unnamed: 0,tweet_id,writer,post_date,body,comment_num,retweet_num,like_num
0,550441509175443456,VisualStockRSRC,1420070457,"lx21 made $10,008 on $AAPL -Check it out! htt...",0,0,1
1,550441672312512512,KeralaGuy77,1420070496,Insanity of today weirdo massive selling. $aap...,0,0,0
2,550441732014223360,DozenStocks,1420070510,S&P100 #Stocks Performance $HD $LOW $SBUX $TGT...,0,0,0
3,550442977802207232,ShowDreamCar,1420070807,$GM $TSLA: Volkswagen Pushes 2014 Record Recal...,0,0,1
4,550443807834402816,i_Know_First,1420071005,Swing Trading: Up To 8.91% Return In 14 Days h...,0,0,1
5,550443808606126081,aaplstocknews,1420071005,Swing Trading: Up To 8.91% Return In 14 Days h...,0,0,1
6,550443809700851716,iknowfirst,1420071005,Swing Trading: Up To 8.91% Return In 14 Days h...,0,0,1
7,550443857142611968,Cprediction,1420071016,Swing Trading: Up To 8.91% Return In 14 Days h...,0,0,1
8,550443857595600896,iknowfirst_br,1420071017,Swing Trading: Up To 8.91% Return In 14 Days h...,0,0,1
9,550443857692078081,Gold_prediction,1420071017,Swing Trading: Up To 8.91% Return In 14 Days h...,0,0,1


###### Market price Data

In [5]:
price = pd.read_json("CompanyValues.json")

In [6]:
price.head(23)
#len(price)
#len(price.volume.unique())

Unnamed: 0,ticker_symbol,day_date,close_value,volume,open_value,high_value,low_value
0,AAPL,2020-05-29,317.94,38399530,319.25,321.15,316.47
1,AAPL,2020-05-28,318.25,33449100,316.77,323.44,315.63
2,AAPL,2020-05-27,318.11,28236270,316.14,318.71,313.09
3,AAPL,2020-05-26,316.73,31380450,323.5,324.24,316.5
4,AAPL,2020-05-22,318.89,20450750,315.77,319.23,315.35
5,AAPL,2020-05-21,316.85,25672210,318.66,320.89,315.87
6,AAPL,2020-05-20,319.23,27876220,316.68,319.52,316.2
7,AAPL,2020-05-19,313.14,25432390,315.03,318.52,313.01
8,AAPL,2020-05-18,314.96,33843130,313.17,316.5,310.3241
9,AAPL,2020-05-15,307.71,41587090,300.35,307.9,300.21


### Step 2: Explore and Assess the Data
#### Explore the Data 

first doing some EDA

In [7]:
# EDA
print("Twitter Data")
twitter_count=tweet.count()
print(f"Rows: {twitter_count}")
print(f"Columns: {len(tweet.columns)}")
print()
print("Market Price")
mrkt_price_count=price.count()
print(f"Rows: {mrkt_price_count}")
print(f"Columns: {len(tweet.columns)}")

Twitter Data
Rows: tweet_id       3717964
writer         3670691
post_date      3717964
body           3717964
comment_num    3717964
retweet_num    3717964
like_num       3717964
dtype: int64
Columns: 7

Market Price
Rows: ticker_symbol    17528
day_date         17528
close_value      17528
volume           17528
open_value       17528
high_value       17528
low_value        17528
dtype: int64
Columns: 7


#### Cleaning Steps
Document steps necessary to clean the data
- convert the post_date column to date
- check for missing values
- check for duplicates values and drop them
- add company symbol ticker to tweet data
- create unique IDs for writer and symbol ticker columns

In [8]:
# convert the post_date column to date type
def create_time_tbl(df, col):
    
    """
    Description: This function converts the numeric date column to datetime and create the time table.
    Arguments:
        df: dataframe. 
        col: the numeric column of the dataframe which need to be converted. 
    Returns:
        None
    """
    tweet['post_date'] = pd.to_datetime(df[col], unit='s')


    time_data = []
    for element in tweet['post_date']:
        time_data.append([element, element.hour, element.day, element.week, element.month, element.year, element.day_name()])
    column_labels = ('date_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')
    time_df = pd.DataFrame(time_data, columns=column_labels)
    return time_df

In [9]:
# create the time table
time_df = create_time_tbl(tweet, "post_date")
time_df.head()

Unnamed: 0,date_time,hour,day,week,month,year,weekday
0,2015-01-01 00:00:57,0,1,1,1,2015,Thursday
1,2015-01-01 00:01:36,0,1,1,1,2015,Thursday
2,2015-01-01 00:01:50,0,1,1,1,2015,Thursday
3,2015-01-01 00:06:47,0,1,1,1,2015,Thursday
4,2015-01-01 00:10:05,0,1,1,1,2015,Thursday


In [10]:
def null_values(df):
    """
    Description: This function checks for null values in a dataframe.
    Arguments:
        df: dataframe. 
    Returns:
        columns with the number of null values
    """
    # check for null values
    nulls = df.isnull().sum().sort_values(ascending = False)
    return nulls

In [12]:
# check for null values in tweet data
null_values(tweet)

writer         47273
tweet_id           0
post_date          0
body               0
comment_num        0
retweet_num        0
like_num           0
dtype: int64

The "writer" column is the only column with missing data. Since this column is not going to be used in the analysis, I am going to ignore the missings for this column for now.

In [13]:
# check for null values in the market price data
null_values(price)

ticker_symbol    0
day_date         0
close_value      0
volume           0
open_value       0
high_value       0
low_value        0
dtype: int64

In [14]:
def drop_repeated_records(df, col):
    """
    Description: This function checks for duplicates values and drop them if any.
    Arguments:
        df: dataframe.
        col: column of the dataframe
    Returns:
        number of duplicate records
    """
    
    # checking for duplicates values in the tweet data
    dups = df[df[col].duplicated()]
    if len(dups) > 0:
        df.drop_duplicates(subset = [col], inplace = True)
    return len(dups)

In [15]:
# checking for duplicates values in the tweet data
drop_repeated_records(tweet, "tweet_id")

0

##### load the company data that contains company symbol, and merge this data with the tweet dataframe

In [16]:
company_symbol = pd.read_csv('Company_Tweet.csv')
# check for any duplicates in
drop_repeated_records(company_symbol, "tweet_id")

618481

In [17]:
# merge company symbol to the tweet data
tweets = pd.merge(tweet,company_symbol,on='tweet_id',how='inner')
tweets.shape

(3717964, 8)

In [18]:
# count unique values of ticker symbol 
print("Twitter Data")
print(tweets.ticker_symbol.value_counts())
print()
print("Market Price Data")
price.ticker_symbol.value_counts()

Twitter Data
AAPL     1425013
TSLA      958370
AMZN      534008
GOOG      312590
MSFT      268257
GOOGL     219726
Name: ticker_symbol, dtype: int64

Market Price Data


GOOGL    3085
AMZN     3085
AAPL     3085
MSFT     3085
TSLA     3065
GOOG     2123
Name: ticker_symbol, dtype: int64

##### Create unique IDs for writer and symbol ticker column

In [19]:
# create ids for writer column
np.random.seed(1)
names = tweets['writer'].unique().tolist()
ids = np.random.randint(low=1e9, high=1e10, size = len(names))
len(set(ids))
maps = {k:v for k,v in zip(names, ids)}
tweets['writer_id'] = tweets['writer'].map(maps)

# create ids for ticker_symbol column
np.random.seed(1)
symbol = tweets['ticker_symbol'].unique().tolist()
sym_id = np.random.randint(low=1e9, high=1e10, size = len(symbol))
len(set(sym_id))
mappings = {k:v for k,v in zip(symbol, sym_id)}
tweets['ticker_symbol_id'] = tweets['ticker_symbol'].map(mappings)

tweets.head()

Unnamed: 0,tweet_id,writer,post_date,body,comment_num,retweet_num,like_num,ticker_symbol,writer_id,ticker_symbol_id
0,550441509175443456,VisualStockRSRC,2015-01-01 00:00:57,"lx21 made $10,008 on $AAPL -Check it out! htt...",0,0,1,AAPL,9577843435,9577843435
1,550441672312512512,KeralaGuy77,2015-01-01 00:01:36,Insanity of today weirdo massive selling. $aap...,0,0,0,AAPL,5005303368,9577843435
2,550441732014223360,DozenStocks,2015-01-01 00:01:50,S&P100 #Stocks Performance $HD $LOW $SBUX $TGT...,0,0,0,AMZN,2703301249,5005303368
3,550442977802207232,ShowDreamCar,2015-01-01 00:06:47,$GM $TSLA: Volkswagen Pushes 2014 Record Recal...,0,0,1,TSLA,2666063943,2703301249
4,550443807834402816,i_Know_First,2015-01-01 00:10:05,Swing Trading: Up To 8.91% Return In 14 Days h...,0,0,1,AAPL,8171504636,9577843435


##### giving the same ticker symbol ID to the ticker symbol in price dataset 

In [20]:
# create ids for ticker_symbol column in price dataset
mappings = {k:v for k,v in zip(tweets["ticker_symbol"].unique(), tweets["ticker_symbol_id"].unique())}
price['ticker_symbol_id'] = price['ticker_symbol'].map(mappings)
price.groupby("ticker_symbol")["ticker_symbol_id"].unique()

ticker_symbol
AAPL     [9577843435]
AMZN     [5005303368]
GOOG     [2666063943]
GOOGL    [7590257550]
MSFT     [8171504636]
TSLA     [2703301249]
Name: ticker_symbol_id, dtype: object

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

A star schema design is chosen for the database. This schema has one Fact Table having twitter data, and four supporting Dimension Tables. A star schema Database design allows for flexible queries by separating dimensions into specific tables in a clean way. Moreover, star schema makes it easy to create OLAP cubes efficiently.

##### Fact table

contains twitter data:

- tweet_id int PRIMARY KEY
- date_time TIMESTAMP
- writer_id bigint
- ticker_symbol_id bigint
- body text NOT NULL
- comment_num int
- retweet_num int
- like_num int


##### User Dimension Table

- writer_id bigint PRIMARY KEY
- writer text

##### Company Dimension Table

- ticker_symbol_id bigint PRIMARY KEY
- ticker_symbol text

##### Value Dimension Table

- day_date TIMESTAMP PRIMARY KEY
- volume int
- open_value float
- high_value float
- low_value float
- ticker_symbol_id bigint

##### Time Dimension Table

- date_time TIMESTAMP PRIMARY KEY
- hour int, day int
- week int
- month int
- year int
- weekday text



#### 3.2 Mapping Out Data Pipelines

##### Pipelinesteps

   * Preprocess and cleanup the data as explained in step 2
   * Join the tweet data with the company symbol data 
   * Create the time table from the post_date column of the tweet data
   * Create tables by running the create_table.py
   * Insert the Data into the tables

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [21]:
# After running create_tables.py, insert the data into the database
conn = psycopg2.connect("dbname=capstone user=postgres password=password")
cur = conn.cursor()

In [22]:
def process_tweet_data(cur, df):
    """
    Description: This function inserts data from tweets dataframe to three tables (tweets, users and company) in PostgreSQL database.
    Arguments:
        cur: the cursor object. 
        df: the dataframe. 
    Returns:
        None
    """
    
    
    # insert tweet record
    df_tweet = tweets[["tweet_id", "post_date", "writer_id", "ticker_symbol_id", "body", "comment_num", "retweet_num", "like_num"]]

    for index, row in df_tweet.iterrows():
        cur.execute(fact_tweet_table_insert, list(row.values))
        conn.commit()
    
    # insert user record
    df_user = tweets[["writer_id", "writer"]]

    for index, row in df_user.iterrows():
        cur.execute(user_table_insert, list(row.values))
        conn.commit()
    
    # insert company symbol data
    df_company = tweets[["ticker_symbol_id", "ticker_symbol"]]
    df_company = df_company.drop_duplicates(subset = ["ticker_symbol_id"])

    for index, row in df_company.iterrows():
        cur.execute(company_table_insert, list(row.values))
        conn.commit()

In [23]:
def process_time_data(cur, df):
    """
    Description: This function saves the data from tweet dataframe into one table (time) in PostgreSQL database.
    Arguments:
        cur: the cursor object. 
        df: the dataframe. 
    Returns:
        None
    """
    
    # expand time data to lower units like day, hour etc.
    time_data = []
    for element in tweet['post_date']:
        time_data.append([element, element.hour, element.day, element.week, element.month, element.year, element.day_name()])
    column_labels = ('date_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')
    time_df = pd.DataFrame(time_data, columns=column_labels)

    # insert time data
    df_time = time_df[["date_time", "hour", "day", "week", "month", "year", "weekday"]]

    for index, row in df_time.iterrows():
        cur.execute(time_table_insert, list(row.values))
        conn.commit()

In [24]:
def process_value_data(cur, df):
    """
    Description: This function saves data from price table into one table (value) in PostgreSQL database.
    Arguments:
        cur: the cursor object. 
        df: dataframe. 
    Returns:
        None
    """
    
    # get values for the value table
    df_value = price[["day_date", "volume", "open_value", "high_value", "low_value", "ticker_symbol_id"]]

    # insert data into the value table
    for index, row in df_value.iterrows():
        cur.execute(value_table_insert, list(row.values))
        conn.commit()

In [26]:
#insert the tweet data
process_tweet_data(cur, tweets)

In [28]:
# insert the time data
process_time_data(cur, tweet)

In [30]:
# insert value data
process_value_data(cur, price)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
1- The function below checks whether there exist data in each table that we created. If there is no data found in      any table, the data quality check fails. 

In [31]:
def quality_check(table, description):
    '''
    Input: Postgres table, description of the table
    Output: Print out the number of rows in the table or it fails
    '''
    
    cur.execute(f"SELECT * FROM {table}")
    rows = cur.fetchall()
    result = len(rows)
    if result > 0:
        print("Data quality check successful for {} with {} records".format(description, result))
    else:
        print("Data quality check failed for {} with zero records".format(description))

In [32]:
quality_check("tweets", "tweets table")
quality_check("users", "user table")
quality_check("company", "company table")
quality_check("value", "value table")
quality_check("time", "time table")

Data quality check successful for tweets table with 3717964 records
Data quality check successful for user table with 140131 records
Data quality check successful for company table with 6 records
Data quality check successful for value table with 17528 records
Data quality check successful for time table with 3421363 records


2- Function below checks whether a column has a type as expected. It returns true if the column type satisfies expectation.

In [33]:
def type_check(table, col, type_col):
    '''
    Input: Postgres table, table column, type of the column
    Output: Print True if the column has the expected type
    '''
    
    cur.execute(f"SELECT {col} FROM {table} limit 20")
    tuples = cur.fetchone()
    result = isinstance(tuples[0], type_col)
    print(result)

In [34]:
type_check("tweets", "writer_id", int)
type_check("tweets", "body", str)
type_check("value", "high_value", float)

True
True
True


#### 4.3 Data dictionary 

##### Fact table

contains twitter data:

- tweet_id int ID number for each tweet
- date_time TIMESTAMP date and time of the tweet (foreign key)
- writer_id bigint a uniqe ID for each person who posted the tweet (foreign key)
- ticker_symbol_id bigint a unique ID for every company (foreign key)
- body text NOT NULL text of the tweet posted
- comment_num int number of comments the tweet received
- retweet_num int number of retweets the tweet received
- like_num int number of likes the tweet received


##### User Dimension Table

can be joined to the fact table by the writer_id 

- writer_id bigint PRIMARY KEY unique ID for each writer of the tweet
- writer text screen name for the writer of the tweet

##### Company Dimension Table

can be joined to the fact table by the ticker_symbol_id

- ticker_symbol_id bigint PRIMARY KEY unique ID for the company
- ticker_symbol text symbol of the countries included in the analysis

##### Value Dimension Table


can be joined to the fact table by the day_date

- day_date TIMESTAMP PRIMARY KEY date and time of each value record
- volume int 
- open_value float open value
- high_value float the highest value of a share of stocks of the company within 24 hours
- low_value float the lowest value of a share of stocks of the company within 24 hours
- ticker_symbol_id bigint unique ID for each company

##### Time Dimension Table

can be joined to the fact table by the date_time

- date_time TIMESTAMP PRIMARY KEY date and time in which the tweet was posted
- hour int, day int
- week int
- month int
- year int
- weekday text

#### Step 5: Complete Project Write Up
 
## Choice of tools and technologies

Pandas is used to preprocess and clean up the data. It is a very efficient tool for this purpose. I have used Python to realize this project because Python is the language I am comfortable with, and it is one of the most used languages for these purposes.
Since the data will be analyzed on a monthly basis, a monthly update of the data is recommended. 

## What if?
1. The data was increased by 100x.

    * Spark can be used to process the data in an efficient and distributed way. Amazon Redshift is also helpful in such a scenario since         it is an optimized analytical database tool for such heavy work-loads.
    
2. The data populates a dashboard that must be updated on a daily basis by 7am every day.

    * Airflow can be used in this scenario. In case of failures, use Dag retries and send failure emails.

3. The database needed to be accessed by 100+ people.
    * Redshift is the right tool in this case because it has auto-scaling capabilities and high read performance

In [35]:
conn.close()