In [1]:
from datetime import datetime, timedelta
import os
import configparser
import requests
import pandas as pd
import numpy as np
import boto3
import io

In [2]:
def read_config_file():
    """
    Read configuration file
    :return:
    RawConfigParser with config data inside
    """
    import configparser
    # Read configuration file
    config = configparser.RawConfigParser()
    config.read('../configs/param.cfg')
    # print(f'ACCESS_KEY_ID: {config["AWS_CREDENTIAL"]["AWS_ACCESS_KEY_ID"]}\n\
    # SECRET_ACCESS_KEY: {config["AWS_CREDENTIAL"]["AWS_SECRET_ACCESS_KEY"]}\n\
    # SESSION_TOKEN: {config["AWS_CREDENTIAL"]["AWS_SESSION_TOKEN"]}')
    return config

def print_log(message):
    """
    Log into console datetime to execute along with message
    param:
        message: [String] Message to print in the console
    """
    print('{} - '.format(datetime.now()) + message)

def download_data(config, data_name):
    """
    Send 1 request to data sources and download data
    :return:
    none
    save downloaded file into cluster hard disk
    """
    import requests

    path_question = config['FILE_LOCATION'][f'loc_so_{data_name}']
    url_question = config['SOURCE_URL'][f'so_{data_name}_zip']
    req_question = requests.get(url_question, allow_redirects=True)
    print_log(f'{data_name}.zip downloaded')
    open(path_question, 'wb').write(req_question.content)
    print_log(f'{data_name}.zip saved to local')


def add_hash_column(config, data_name):
    """
    This function will add a column hashing all data of each row,
    this column will be used to track changes in data source in FUTURE USE
    :return:
    none
    data passed in this function will be added hash column and save as CSV file
    """
    import pandas as pd

    path_data = config['FILE_LOCATION'][f'loc_so_{data_name}']
    path_output = config['FILE_LOCATION'][f'loc_{data_name}_with_hash']
    df = pd.read_csv(path_data, compression='zip', nrows=1000000)

    # Add hash_key column to track data changes in each row
    df['hash_key'] = df.apply(lambda row: pd.util.hash_pandas_object(
        pd.Series(row.to_string())), axis=1)
    print_log(f'hash_key for {data_name} table created')

    df.to_csv(path_output)
    print_log(f'{data_name} table with hash_key saved into local')

def transform_question_data(config):
    """
    This function will create dimDate table by extracting from [questions] table
    :param config: environment configuration
    :return:
    none
    dimDate table created as CSV file
    factQuestion will be updated by adding 3 date columns, renaming 3 datetime columns
    """
    import pandas as pd
    import numpy as np

    path_question = config['FILE_LOCATION']['loc_question_with_hash']
    path_dim_date = config['FILE_LOCATION']['loc_dim_date']
    path_fact_question = config['FILE_LOCATION']['loc_fact_question']
    print_log(f'Done reading file location config')

    df_question = pd.read_csv(path_question)
    print_log(f'Done reading question_with_hash csv')

    # BEGIN: Create dim_date table
    # - Get all date columns in question table and exclude time in datetime data
    df_date_stg = df_question[['CreationDate', 'ClosedDate', 'DeletionDate']].apply(
        lambda date: pd.to_datetime(date).dt.date, axis=1)

    # - Combine 3 date columns into one,, remove null and duplicates
    sr_date = pd.concat([df_date_stg['CreationDate'], df_date_stg['ClosedDate'], df_date_stg['DeletionDate']],
                        axis=0,
                        ignore_index=True).dropna().drop_duplicates().sort_values()

    # - Create dim_date dataframe, add relative columns
    df_date = pd.DataFrame({'date': sr_date,
                            'day': sr_date.apply(lambda d: d.day),
                            'weekday': sr_date.apply(lambda d: d.weekday()),
                            'month': sr_date.apply(lambda d: d.month),
                            'quarter': sr_date.apply(lambda d: int(np.ceil(d.month / 3))),
                            'year': sr_date.apply(lambda d: d.year)}).reset_index(drop=True)
    df_date.to_csv(path_dim_date)
    print_log(f'dimDate table saved into local')
    # END: Create dim_date table

    # BEGIN: Transform fact_question table
    # - Rename time in datetime columns
    df_question.rename(columns={'CreationDate': 'CreationDateTime', 'ClosedDate': 'ClosedDateTime',
                                'DeletionDate': 'DeletionDateTime'}, inplace=True)
    df_question = pd.concat([df_question, df_date_stg], axis=1)
    print_log(f'question table updated - adding 3 date columns, rename 3 datetime columns')

    # - Add status column in fact_question table
    df_question['status'] = df_question.apply(lambda row:
                                              'Closed' if isinstance(row['ClosedDate'], str) else
                                              'Deleted' if isinstance(row['DeletionDate'], str) else
                                              'Open', axis=1)
    df_question.to_csv(path_fact_question)
    print_log(f'factQuestion table saved into local')
    # END: Transform fact_question table


def transform_question_tag_data(config):
    """
    This function will create dimTag table which is extracted from [question tags] data
    :param config: environment configuration
    :return:
    none
    dimTag table created as csv file and factQuestionTag table updated by adding FK from dimTag table
    """
    import pandas as pd

    path_question_tag = config['FILE_LOCATION']['loc_question_tag_with_hash']
    path_fact_question_tag = config['FILE_LOCATION']['loc_fact_question_tag']
    path_dim_tag = config['FILE_LOCATION']['loc_dim_tag']
    print_log('file location config read successful')

    df_question_tag = pd.read_csv(path_question_tag)

    # START: Create dim_tag table
    # - Get the column Tag in question_tag table
    df_tag = df_question_tag['Tag'].drop_duplicates().reset_index(drop=True).reset_index()

    # - Rename column for business understanding
    df_tag.rename(columns={'index': 'TagID'}, inplace=True)
    print_log('dimTag table created')
    df_tag.to_csv(path_dim_tag)
    print_log('dimTag table saved into local')
    # END: Create dim_tag table

    # START: Transform fact_question_tag table
    df_question_tag = df_question_tag.merge(df_tag, on='Tag')
    df_question_tag.drop('Tag', axis=1, inplace=True)
    df_question_tag.rename({'ID': 'QuestionID'}, inplace=True)
    df_question_tag.to_csv(path_fact_question_tag)
    print_log('factQuestionTag table updated and saved into local')
    # END: Transform fact_question_tag table


def push_data_to_s3(config):
    """
    This function will push processed CSV files into S3 bucket
    :param config: environment configuration
    :return:
    none, 6 csv files will be pushed into S3 bucket
    """
    import io
    import boto3
    import pandas as pd

    # Create AWS Credential Session
    session = boto3.session.Session(aws_access_key_id=config["AWS_CREDENTIAL"]["AWS_ACCESS_KEY_ID"],
                                    aws_secret_access_key=config["AWS_CREDENTIAL"]["AWS_SECRET_ACCESS_KEY"],
                                    aws_session_token=config["AWS_CREDENTIAL"]["AWS_SESSION_TOKEN"])
    print_log('authenticated session to S3 created')

    # Put objects into S3 datalake
    # Create resource and object
    s3 = session.resource('s3')
    obj_questions_raw = s3.Object(bucket_name='so-question-dl', key='raw/questions_hash.csv')
    obj_questions_tag_raw = s3.Object(bucket_name='so-question-dl', key='raw/question_tags_hash.csv')
    obj_questions = s3.Object(bucket_name='so-question-dl', key='factQuestion.csv')
    obj_question_tags = s3.Object(bucket_name='so-question-dl', key='factQuestionTag.csv')
    obj_date = s3.Object(bucket_name='so-question-dl', key='dimDate.csv')
    obj_tag = s3.Object(bucket_name='so-question-dl', key='dimTag.csv')
    print_log('objects in S3 located')

    # Get location of csv files which will be pushed into S3
    path_questions = config['FILE_LOCATION']['loc_question_with_hash']
    path_question_tags = config['FILE_LOCATION']['loc_question_tag_with_hash']
    path_dim_date = config['FILE_LOCATION']['loc_dim_date']
    path_dim_tag = config['FILE_LOCATION']['loc_dim_tag']
    path_fact_question = config['FILE_LOCATION']['loc_fact_question']
    path_fact_question_tag = config['FILE_LOCATION']['loc_fact_question_tag']
    print_log('processed csv files loaded from local')

    # Put [questions.csv] object into bucket
    obj_questions_raw.put(Body=open(path_questions, 'rb'))
    print_log(f'{path_questions} uploaded')

    # Put [question_tags.csv] object into bucket
    obj_questions_tag_raw.put(Body=open(path_question_tags, 'rb'))
    print_log(f'{path_question_tags} uploaded')

    # Put [dimDate.csv] object into bucket
    obj_date.put(Body=open(path_dim_date, 'rb'))
    print_log(f'{path_dim_date} uploaded')

    # Put [dimTag.csv] object into bucket
    obj_tag.put(Body=open(path_dim_tag, 'rb'))
    print_log(f'{path_dim_tag} uploaded')

    # Put [factQuestion.csv] object into bucket
    obj_questions.put(Body=open(path_fact_question, 'rb'))
    print_log(f'{path_fact_question} uploaded')

    # Put [factQuestionTag.csv] object into bucket
    obj_question_tags.put(Body=open(path_fact_question_tag, 'rb'))
    print_log(f'{path_fact_question_tag} uploaded')

In [3]:
config = read_config_file()
download_data(config, 'question')
download_data(config, 'question_tag')

add_hash_column(config, 'question')
add_hash_column(config, 'question_tag')

transform_question_data(config)
transform_question_tag_data(config)

2021-11-15 22:57:48.643191 - question.zip downloaded
2021-11-15 22:57:48.931492 - question.zip saved to local
2021-11-15 22:58:12.710377 - question_tag.zip downloaded
2021-11-15 22:58:13.045308 - question_tag.zip saved to local
2021-11-15 23:10:09.436749 - hash_key for question table created
2021-11-15 23:10:12.986345 - question table with hash_key saved into local
2021-11-15 23:21:51.774008 - hash_key for question_tag table created
2021-11-15 23:21:54.988365 - question_tag table with hash_key saved into local
2021-11-15 23:21:54.988365 - Done reading file location config
2021-11-15 23:21:56.741681 - Done reading question_with_hash csv
2021-11-15 23:28:41.552880 - dimDate table saved into local
2021-11-15 23:28:41.663235 - question table updated - adding 3 date columns, rename 3 datetime columns
2021-11-15 23:29:01.869898 - factQuestion table saved into local
2021-11-15 23:29:02.082892 - file location config read successful
2021-11-15 23:29:02.827004 - dimTag table created
2021-11-15 2

In [64]:
push_data_to_s3(config)