# Core Data Management Pipeline
### Features:
- From multiple sources (twitter, github, apis), do the following:
- extract raw data (as-is) into the `data/raw` folder
- prepare raw data and save as prepped data into the `data/prep` folder
- assemble final data set and save it as the `data/final` folder


### To-Do's:
- MUST:
    - Twitter data needs 2 steps: [1] extract label/tweet_id [2] query tweet_id for text
- SHOULD:
    - Better logging
    - move common tools that i will use many times into a common.py file
    - improved metadata on final dataset
        - for example, is there treatment needed from multi-label perspective? if so this should be carried into the final dataset
        - [tutorial: tweepy](https://medium.com/analytics-vidhya/fetch-tweets-using-their-ids-with-tweepy-twitter-api-and-python-ee7a22dcb845)
- COULD:

- WON'T: 
    - ML etl should not be in this notebook
    - ETL on user handles? 

#### Dependencies

In [1]:
import logging
import pandas as pd
from pandas.io.json import json_normalize #package for flattening json in pandas df
from datetime import datetime as dt
import requests
import json

## TODO: Data Preparation Helpers

- this is baseline, "must transform" data engineering ... preparation is only to standardize dataset into tabular format from disparate sources
- this section does not do ML-style transformation related to feature engineering or enrichment


In [2]:

def get_current_time():
    return dt.now().strftime("%Y%m%d_%H%M")

# ------- data cleaning functions --------
# data cleaning pipeline for hate-1:
def hate_1_clean(df_raw):
    return (
        df_raw
        [0]
        .str.split(":",expand=True, n = 1)
        .rename(columns = {0:'label', 1:'text'})
        .assign()
    )

# data cleaning pipeline for hate-1:
def hate_2_clean(df_raw):
    return (
    df_raw
    .query("cn_id.str.startswith('EN')", engine = 'python')
    [['cn_id','hateSpeech','counterSpeech']]
    .melt(id_vars = 'cn_id')
    .assign(variable = lambda x: x.variable.replace({'hateSpeech':1,'counterSpeech':0}))
    .query("value.duplicated() == False", engine = 'python')
    .drop(columns = 'cn_id')
    .rename(columns = {'variable':'label','value':'text'})
        
)

## TODO: Data Extraction Metadata

- Purpose: This section tells the pipeline what to do and when

In [3]:

# ------- data source meatdata ------------
extraction_meta = {    
    'sexism_1':{
        'type':'twitter',
        'url_data':'https://raw.githubusercontent.com/ZeerakW/hatespeech/master/NAACL_SRW_2016.csv',
        'url_gh':'https://github.com/ZeerakW/hatespeech',
        'paper':None,
        'prep_function':None
    },

    'hate_1':{
        'type':'flat',
        'url_data':'https://raw.githubusercontent.com/sjtuprog/fox-news-comments/master/annotated-threads/all-comments.txt',
        'url_gh':'https://github.com/sjtuprog/fox-news-comments',
        'paper':'https://arxiv.org/pdf/1710.07395.pdf',
        'prep_function':hate_1_clean
    },
    
    'hate_2':{
        'type':'json',
        'url_data':'https://raw.githubusercontent.com/marcoguerini/CONAN/master/CONAN/CONAN.json',
        'parent_node':'conan',
        'url_gh':None,
        'paper':None,
        'prep_function':hate_2_clean
    }
}

## TODO: Extraction Management

In [4]:

# Set up the logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('Extraction')

# keys are defined in json above
for key in extraction_meta.keys():
    # grab info from metadata
    data_set_type = extraction_meta[key]['type']
    url_data = extraction_meta[key]['url_data']
    
    logger.info(f" -- dataset-id: {key} -- type: {data_set_type}")
    
    # ----- JSON ----- #
    if data_set_type == 'json':
        # grab info from metadata
        parent_node = url_data = extraction_meta[key]['parent_node']
        
        # request
        payload = requests.get(extraction_meta[key]['url_data'])
        content = payload.content
        raw_json = json.loads(content)

        if parent_node != None:
            logger.info(f"Filter Parent Node")
            raw_json = raw_json[parent_node]

        extraction_meta[key]['df'] = pd.DataFrame(raw_json)
        extraction_meta[key]['df'].to_csv(f"./data/raw/{key}_raw_data.csv",index = None)
            
    elif data_set_type == 'flat':
        extraction_meta[key]['df'] = pd.read_table(extraction_meta[key]['url_data'], header=None)
        extraction_meta[key]['df'].to_csv(f"./data/raw/{key}_raw_data.csv",index = None)
    
    elif data_set_type == 'twitter':
        """
        will need to add more logic here to request twitter.
        better idea to split this into two parts.
        actual twitter extraction will be a big deal.
        """
        extraction_meta[key]['df_twitter_id_labels'] = pd.read_table(extraction_meta[key]['url_data'], header=None)
        extraction_meta[key]['df_twitter_id_labels'].to_csv(f"./data/raw/{key}_twitterindex.csv",index = None)

INFO:Extraction: -- dataset-id: sexism_1 -- type: twitter
INFO:Extraction: -- dataset-id: hate_1 -- type: flat
INFO:Extraction: -- dataset-id: hate_2 -- type: json
INFO:Extraction:Filter Parent Node


## TODO: Data Prep Orchestration

Working standards:
- `label` is the column for truth values
- `text` is column for raw feature

Currently working:
- For text based datasets 


Not started:
- Twitter
- Logging info
- Logging important eda stuff like class imbalance or total n-size

In [5]:
# for the entire extraction meta object
for key in extraction_meta.keys():
    
    logger.info(f" -- dataset-id: {key} -- type: {data_set_type}")
    
    
    # if applicable, apply a prep function and write the file out
    if (extraction_meta[key].get('df',None) is not None):
        (
            extraction_meta[key]['prep_function'](extraction_meta[key]['df'])
            .to_csv(f"./data/prep/{key}_prepped.csv",index = None)
        )
    else:
        pass

INFO:Extraction: -- dataset-id: sexism_1 -- type: json
INFO:Extraction: -- dataset-id: hate_1 -- type: json
INFO:Extraction: -- dataset-id: hate_2 -- type: json
INFO:numexpr.utils:NumExpr defaulting to 8 threads.


## TODO: data assembly

should haves:
- more metadata? only dataset file name for now

In [6]:
import os

fp_prep = './data/prep/'
fp_final = './data/final/'

prepped_data = list()
for i in os.listdir(fp_prep):
    if '_prepped' in i:
        prepped_data_i = (
            pd.read_csv(fp_prep + i)
            .assign(
                dataset = i
            )
        )
        prepped_data.append(prepped_data_i)
        
    else:
        pass

final_data = pd.concat(prepped_data)
final_data.to_csv(f'{fp_final}clean_data_version_{get_current_time()}.csv', index = None)