# Setup

In [3]:
import pandas as pd
import json
try:
    import pandas_redshift as pr
except:
    !pip install pandas_redshift
    import pandas_redshift as pr 
    
import boto3
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from sagemaker import get_execution_role

In [4]:
!sudo ip route del default
!sudo ip route add default via 10.84.0.165 dev eth2

# Read Data

In [5]:
class ReadData:
    def __init__(self, credentials_file = './config/redshift.json'):
        with open(credentials_file) as raw_file:
            self.__creds = json.loads(raw_file.read())

    def _read_csv_file(self, file):
        return pd.read_csv(file, low_memory=False)
        try:
            return pd.read_csv(file, low_memory=False, date_parser = pd.to_datetime, parse_dates=["created_at"])
        except:
            return pd.read_csv(file, low_memory=False)

    def __read_path(self, all_files):  
        with ThreadPoolExecutor(max_workers=8) as executor:
            result = executor.map(self._read_csv_file, all_files)
        df_temp = pd.concat(result)
        return df_temp
        
    def read_in_parallel_from_path(self, file_path):
        """
        read data from folder path in parallel
        """
        print('reading...')
        start_time = time.time()
        all_files = glob.glob(file_path, recursive=True)
        
        df_temp = self.__read_path(all_files)
        
        time_elapsed = time.time() - start_time
        print('done with time:', str(timedelta(seconds=time_elapsed)))
        return df_temp

    def read_in_parallel_from_s3(self, bucket, file_path):
        """
        read data from folder in s3 bucket in parallel
        """
        print('reading...')
        start_time = time.time()

        s3 = boto3.client("s3")
        all_objects = s3.list_objects(Bucket = bucket, Prefix=file_path)
        files = [f"s3://{bucket}/{i['Key']}" for i in all_objects['Contents']]
        
        df_temp = self.__read_path(all_files)
        
        time_elapsed = time.time() - start_time
        print('done with time:', str(timedelta(seconds=time_elapsed)))
        return df_temp
             
             
    def __verify_arguments(self, country, signup_app_id, device):
        """
        verify the type of the arguments and prepare for quary
        """
        if type(country) != list:
                country = list([country])

        if type(signup_app_id) != list:
            signup_app_id = list([signup_app_id])

        if type(device) != list:
            device =  list([device])

        country = ','.join([f"'{i}'" for i in country])
        signup_app_id = ','.join([f"'{i}'" for i in signup_app_id])
        device = ','.join([f"'{i}'" for i in device])
        return country, signup_app_id, device
    
    def read_redshift_users(self, users_list):
        """
        read data from redshift based on specified users_list
        users_list: list of users, or one user
        signup_app_id: list or one signup id
        country :  list or one country
        device  :  list or one device
        """
        print('reading...')
        
        if type(users_list) != list:
                users_list =  list([users_list])

        users_list = ', '.join([str(i) for i in users_list])
            
        query_users_temp_table =f"""
        CREATE TEMPORARY TABLE users as
        (SELECT DISTINCT(all_users.id) as user_id
        FROM prd_sj.sj_users as all_users
        WHERE id IN ({users_list})
                )
            """
        return self.__query_redshift(query_users_temp_table)
        
        
    def read_redshift_date(self, start_date, end_date, signup_app_id=[1,2,3], 
                           country=['US', 'AU', 'CA'], 
                           device=['Desktop', 'Smartphone', 'Tablet', 'Other Non-Mobile', 'Other Mobile',
                                   'Robot', 'Smart-TV', 'Feature Phone'],
                           min_points=500):
                 
        """
        read data from redshift based on users who signed up between specified start date and end date
        start_date: string in date format (year/month/day)
        end_date: string in date format (year/month/day)
        signup_app_id: list or one signup id
        country :  list or one country
        device  :  list or one device
        """
        print('reading...')
        country, signup_app_id, device = self.__verify_arguments(country, signup_app_id, device)
        
        query_users_temp_table =f"""
        CREATE TEMPORARY TABLE users as
        (SELECT DISTINCT(all_users.id) as user_id
        FROM prd_sj.sj_users as all_users
        WHERE all_users.created_at::date >= '{start_date}'
            AND all_users.created_at::date <= '{end_date}'
            AND all_users.lifetime_points >={min_points}
            AND all_users.signup_app_id IN ({signup_app_id})
            AND all_users.id IN
                    (SELECT user_id 
                    from prd_sj.sj_answers 
                    WHERE text_value IN ({country})
                    AND question_id=8)
            AND all_users.id IN
                    (SELECT user_id 
                    from prd_sj.sj_answers 
                    WHERE text_value IN ({device})
                    AND question_id=220)
                )
            """
        return self.__query_redshift(query_users_temp_table)
    
    
    def __query_redshift(self, query_users_temp_table):
        query_df_behavior = f"""
        SELECT user_id, event_type_id, click_type_id, created_at, provider_id, device_platform, conversion_status_id, financial_points_promised, financial_points_paid,
        financial_revenue_earned, device_browser, device_display_height, device_display_width, device_ip, device_ip_asn, activity_type_id, activity_conv_rate_all, impression_row, impression_column, impression_position 
        FROM prd_sj.user_activity_behavior
        WHERE event_type_id IN (1,2)
        AND user_id IN
            (SELECT user_id
                FROM users)
        """

        query_df_login ="""
        SELECT *
        FROM prd_sj.user_login_activity
        WHERE user_id IN
            (SELECT user_id
                FROM users)
        """

        query_df_stat_hist ="""
        SELECT *
        FROM prd_sj.sj_user_status_history
        WHERE user_id IN
            (SELECT user_id
                FROM users)
        """


        query_df_prof_hist = """
        SELECT *
        FROM prd_sj.sj_user_profile_history
        WHERE user_id IN
            (SELECT user_id
                FROM users)
        """


        query_df_completed_act = """
        SELECT *
        FROM prd_sj.sj_user_completed_activities
        WHERE user_id IN
            (SELECT user_id
                FROM users)
        """

        query_df_answers = """
        SELECT *
        FROM prd_sj.sj_answers
        WHERE user_id IN
            (SELECT user_id
                FROM users)
        """

        query_df_users = """
        SELECT *
        FROM prd_sj.sj_users
        WHERE id IN
            (SELECT user_id
                FROM users)
        """

        print('Connecting')
        start_time = time.time()
        pr.connect_to_redshift(dbname   = self.__creds['db.redshift.db_name'], 
                               host     = self.__creds['db.redshift.host'], 
                               port     = self.__creds['db.redshift.port'], 
                               user     = self.__creds['db.redshift.user'], 
                               password = self.__creds['db.redshift.pass'])  

        print('Connection established')
        pr.exec_commit(query_users_temp_table)

        df       = pr.redshift_to_pandas(query_df_behavior)
        df_users = pr.redshift_to_pandas(query_df_users)
        df_answers   =  pr.redshift_to_pandas(query_df_answers)
        df_prof_hist =  pr.redshift_to_pandas(query_df_prof_hist)
        df_stat_hist =  pr.redshift_to_pandas(query_df_stat_hist)
        df_login     =  pr.redshift_to_pandas(query_df_login)
        df_completed_act =  pr.redshift_to_pandas(query_df_completed_act)
        pr.close_up_shop()

        time_elapsed = time.time() - start_time
        print('done with time:', str(timedelta(seconds=time_elapsed)))
        return df_users, df_completed_act, df_answers, df_prof_hist, df_stat_hist, df_login, df

In [None]:
# per month -> join
# per class columns -> join

----

In [6]:
def verify_directory(save_dir):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

        
def write_to_s3(file_path, delete_local=True, bucket= 'disqo-data-science-dev'):
    key = f"fraud_detection{file_path[file_path.find('/'):]}"
    boto3.Session().resource('s3').Bucket(bucket).Object(key).upload_file(file_path)
    print('Done writing to {}'.format('s3://{}/{}'.format(bucket, key)))
    
    if delete_local:
        os.remove(file_path)
        
        
def write_to_redshift_from_s3(file_path, redshift_table_name, append = False, bucket = "disqo-data-science-dev",
                             credentials_file = './config/redshift.json'):
    """
    Copy the data from s3 to redhshift, either create a new table or append to an existing one
    maximum number of columns in redshift: 1600
    """
   
    with open(credentials_file) as raw_file:
        creds = json.loads(raw_file.read())
            
    #CONNECT TO S3 AND TO REDSHIFT
    pr.connect_to_s3(aws_access_key_id      = creds["aws_access_key_id_grp"],
                     aws_secret_access_key  = creds["aws_secret_access_key_grp"],
                     bucket                 = bucket,
                     subdirectory           = None)
    
    pr.connect_to_redshift(dbname   = creds['db.redshift.db_name'], 
                       host     = creds['db.redshift.host'], 
                       port     = creds['db.redshift.port'], 
                       user     = creds['db.redshift.user'], 
                       password = creds['db.redshift.pass'])  
    
    #READ THE DATA FROM S3
    data = pd.read_csv(f"s3://{bucket}/{file_path}")
    
    #VALIDATE COLUMN NAMES FOR REDSHIFT
    data_frame = pr.validate_column_names(data)
    
    # CREATE AN EMPTY TABLE IN REDSHIFT
    if not append:
        pr.create_redshift_table(data_frame, redshift_table_name)

    # # CREATE THE COPY STATEMENT TO SEND FROM S3 TO THE TABLE IN REDSHIFT
    pr.s3_to_redshift(redshift_table_name, file_path)
    pr.close_up_shop()

In [48]:
# file_path = "fraud_detection/data/processed/train/data_2020-06-01_from_2020-02-01_to_2020-03-01_.csv"
# redshift_table_name = 'dev_gasia.test_direct'
# write_to_redshift_from_s3(file_path, redshift_table_name, append = False, bucket = "disqo-data-science-dev")

In [None]:
# data_1 - > data_auto_2020-06-01_from_2020-02-01_to_2020-03-01_   
# model_2 - > data_2020-07-01_from_2020-03-01_to_2020-04-01_
# model_3

In [None]:
# session
# 1100
# 2000

In [None]:
# dev_fraud
# fraud_input   model_1 febr->april.  model_2 febr->april.
# fraud_output
# redeem_input

# email: data cut
# append performance

# cut_prob, without_cut_prob, autoencoder, convolution