In [2]:
from datetime import date, timedelta
import pandas as pd
import numpy as np
import boto3
import json
import pysftp
from botocore.exceptions import ClientError
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
from sqlalchemy import create_engine

def get_aws_creds(database):
    """Get Redshift credentials from AWS Systems Manager (SSM)"""
    ssm_keys = ['prod_redshift_host', 'prod_redshift_ro_password', 'prod_redshift_port', 'prod_redshift_ro_user',
                'lending_readonly_host', 'lending_readonly_pass', 'lending_readonly_port', 'lending_readonly_user',
                database]
    ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
    ssm_secrets = dict()
    for ssm_parameter in ssm_parameters:
        ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
    return ssm_secrets
def get_engine(database):
    ssm_secrets = get_aws_creds(database)
    if database == 'prod_redshift_name':
        username = ssm_secrets.get('prod_redshift_ro_user')
        password = ssm_secrets.get('prod_redshift_ro_password')
        hostname = ssm_secrets.get('prod_redshift_host')
        port_number = ssm_secrets.get('prod_redshift_port')
        databasename = ssm_secrets.get(database)
        engine_link = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(username, password, hostname, port_number, databasename)
        engine = create_engine(engine_link)
    elif database == 'lending_readonly_dbname':
        username = ssm_secrets.get('lending_readonly_user')
        password = ssm_secrets.get('lending_readonly_pass')
        hostname = ssm_secrets.get('lending_readonly_host')
        port_number = ssm_secrets.get('lending_readonly_port')
        databasename = ssm_secrets.get(database)
        engine_link = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(username, password, hostname, port_number, databasename)
        engine = create_engine(engine_link)
    elif database == 'UNIFIED_READ_ONLY_DB':
        username = ssm_secrets.get('UNIFIED_READ_ONLY_USER')
        password = ssm_secrets.get('UNIFIED_READ_ONLY_PASSWORD')
        hostname = ssm_secrets.get('UNIFIED_READ_ONLY_HOST')
        port_number = ssm_secrets.get('UNIFIED_READ_ONLY_PORT')
        databasename = ssm_secrets.get(database)
        engine_link = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(username, password, hostname, port_number, databasename)
        engine = create_engine(engine_link)
    return engine


def get_engine_unified(database):
    ssm_keys = ['UNIFIED_READ_ONLY_USER','UNIFIED_READ_ONLY_PASSWORD','UNIFIED_READ_ONLY_HOST','UNIFIED_READ_ONLY_PORT',
                database]
    ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
    ssm_secrets = dict()
    for ssm_parameter in ssm_parameters:
        ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
    username = ssm_secrets.get('UNIFIED_READ_ONLY_USER')
    password = ssm_secrets.get('UNIFIED_READ_ONLY_PASSWORD')
    hostname = ssm_secrets.get('UNIFIED_READ_ONLY_HOST')
    port_number = ssm_secrets.get('UNIFIED_READ_ONLY_PORT')
    databasename = ssm_secrets.get(database)
    engine_link = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(username, password, hostname, port_number, databasename)
    engine = create_engine(engine_link)
    return engine

def get_engine_banking(database):
    ssm_keys = ['PORTAL_READONLY_USER','PORTAL_READONLY_HOST','PORTAL_READONLY_PASSWORD']
    ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
    ssm_secrets = dict()
    for ssm_parameter in ssm_parameters:
        ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
    username = ssm_secrets.get('PORTAL_READONLY_USER')
    password = ssm_secrets.get('PORTAL_READONLY_PASSWORD')
    hostname = ssm_secrets.get('PORTAL_READONLY_HOST')
    databasename = database
    engine_link = 'postgresql+psycopg2://{}:{}@{}/{}'.format(username, password, hostname, databasename)
    engine = create_engine(engine_link)
    return engine


engine_dw2 = get_engine('prod_redshift_name')
engine_lending = get_engine('lending_readonly_dbname')
engine_unified = get_engine_unified('UNIFIED_READ_ONLY_DB')
engine_bank_connect = get_engine_banking('finbox_dashboard')


def get_yogesh_aws_creds():
    """Get Redshift credentials from AWS Systems Manager (SSM)"""
    ssm_keys = ['yogesh_access_key', 'yogesh_secret_key']
    ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
    ssm_secrets = dict()
    for ssm_parameter in ssm_parameters:
        ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
    return ssm_secrets


def create_presigned_url(stmt_id, bank, expiration=60 * 60 * 24 * 7):
    """Generate a presigned URL to share an S3 object

    :param bucket_name: string
    :param object_name: string
    :param expiration: Time in seconds for the presigned URL to remain valid
    :return: Presigned URL as string. If error, returns None.
    """

    # Generate a presigned URL for the S3 object
    ssm_secrets = get_yogesh_aws_creds()
    s3_client = boto3.client('s3', aws_access_key_id=ssm_secrets.get('yogesh_access_key'), aws_secret_access_key=ssm_secrets.get('yogesh_secret_key'), region_name='ap-south-1')
    try:
        response = s3_client.generate_presigned_url('get_object',
                                                    Params={'Bucket': 'fsmprod',
                                                            'Key': '{}_{}.pdf'.format(stmt_id, bank)},
                                                    ExpiresIn=expiration - 60)
    except ClientError as e:
        print('exception on url generating', e)
        return None
    # The response contains the presigned URL
    return response

def get_ftp_password():
    """Get Redshift credentials from AWS Systems Manager (SSM)"""
    ssm_keys = ['IIFL_SFTP_Password']
    ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
    ssm_secrets = dict()
    for ssm_parameter in ssm_parameters:
        ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
    return ssm_secrets.get('IIFL_SFTP_Password')

ModuleNotFoundError: No module named 'pysftp'

In [3]:
def get_from_dw2_apps():
    dfapps=pd.read_sql("""SELECT subusername,
       count(DISTINCT package_name) as count_apps,client_id,date(created_at) as created_at,sdk_version_name
FROM apps
WHERE subusername IN
    (select subusername from ddb_customer_id_prod where date(created_at) = current_date)
  AND date(created_At) = CURRENT_DATE
GROUP BY subusername,client_id,date(created_at),sdk_version_name""", engine_dw2)
    return dfapps

def get_from_dw2_sms():
    dfsms=pd.read_sql("""SELECT subusername,
       count(*) as count_sms,client_id,date(created_at) as created_at,sdk_version_name
FROM sms
WHERE subusername IN
    (select subusername from ddb_customer_id_prod where date(created_at) = current_date)
  AND date(created_At) = CURRENT_DATE
GROUP BY subusername,client_id,date(created_at),sdk_version_name""", engine_dw2)
    return dfsms

def required_table():
    df1=get_from_dw2_apps()
    df2=get_from_dw2_sms()
    df=pd.merge(df1,df2, on=['subusername','client_id','created_at','sdk_version_name'], how='outer')
    return df

df3=required_table()
display(df3)

NameError: name 'engine_dw2' is not defined

In [4]:
test_fr=df3[['created_at','count_apps', 'client_id', 'sdk_version_name', 'count_sms']]

NameError: name 'df3' is not defined

In [5]:
df5=test_fr.groupby(["created_at","client_id","sdk_version_name"]).quantile([0.0, 0.10, 0.25,0.50,0.75,0.90,1.00]).reset_index()
display(df5)


NameError: name 'test_fr' is not defined

In [125]:
df_pivot=pd.pivot_table(df5,index=["created_at","client_id","sdk_version_name"],columns=['level_3']).reset_index()

In [126]:
df_pivot.columns = df_pivot.columns.droplevel(0)
df_pivot.reset_index().rename_axis(None, axis=1)

Unnamed: 0,index,Unnamed: 2,Unnamed: 3,Unnamed: 4,0.0,0.1,0.25,0.5,0.75,0.9,1.0,0.0.1,0.1.1,0.25.1,0.5.1,0.75.1,0.9.1,1.0.1
0,0,2022-12-15,1,3.3,137.0,158.0,201.0,220.0,303.5,331.0,372.0,4.0,46.2,202.5,1562.0,3224.5,4853.6,6495.0
1,1,2022-12-15,122,3.2.1,266.0,284.2,311.5,357.0,357.0,357.0,357.0,,,,,,,
2,2,2022-12-15,122,3.2.19,,,,,,,,49.0,49.0,49.0,192.0,192.0,5067.6,8318.0
3,3,2022-12-15,13610,3.2.12,138.0,183.0,202.25,234.5,293.75,340.7,503.0,1.0,52.0,198.5,844.0,2837.5,6489.6,30000.0
4,4,2022-12-15,142,2.7.20,214.0,218.8,226.0,238.0,253.5,262.8,269.0,,,,,,,
5,5,2022-12-15,22383,3.2.12,255.0,278.2,313.0,355.0,382.0,415.6,438.0,52.0,572.8,1354.0,4763.0,8397.0,9622.2,10439.0
6,6,2022-12-15,25507,3.3.1,218.0,260.4,271.0,285.0,359.0,384.6,395.0,34.0,34.0,34.0,579.0,620.0,2552.4,7622.0
7,7,2022-12-15,32,3.2.1,226.0,226.7,227.75,229.5,231.25,232.3,233.0,,,,,,,
8,8,2022-12-15,32,3.3.4,303.0,303.0,303.0,303.0,354.0,371.0,371.0,53.0,53.0,104.5,259.0,259.0,259.0,259.0
9,9,2022-12-15,33,2.9.4.3-hotfix,134.0,267.0,301.0,335.0,373.0,419.0,818.0,1.0,36.0,159.0,723.0,2612.0,6289.4,60013.0


In [127]:
df_pivot.columns = ['created_at', 'client_id', 'sdk_version_name',  'count_apps_0.0',  'count_apps_0.1', 'count_apps_0.25',  'count_apps_0.5', 'count_apps_0.75',  'count_apps_0.9',  'count_apps_1.0',  'count_sms_0.0',  'count_sms_0.1',
       'count_sms_0.25',  'count_sms_0.5', 'count_sms_0.75',  'count_sms_0.9',  'count_sms_1.0']

In [128]:
def get_engine(database):
    ssm_secrets = dict()

    if database == 'prod_redshift_name':
        ssm_keys = ['prod_redshift_name', 'prod_redshift_ro_user', 'prod_redshift_ro_password', 'prod_redshift_host', 'prod_redshift_port']
        ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
        for ssm_parameter in ssm_parameters:
            ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
        username = ssm_secrets.get('prod_redshift_ro_user')
        password = ssm_secrets.get('prod_redshift_ro_password')
        hostname = ssm_secrets.get('prod_redshift_host')
        port_number = ssm_secrets.get('prod_redshift_port')
        databasename = ssm_secrets.get(database)
        engine_link = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(username, password, hostname, port_number, databasename)
        engine = create_engine(engine_link)

    elif database == 'lending_readonly_dbname':
        ssm_keys = ['lending_readonly_dbname', 'lending_readonly_user', 'lending_readonly_pass', 'lending_readonly_host', 'lending_readonly_port']
        ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
        for ssm_parameter in ssm_parameters:
            ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
        username = ssm_secrets.get('lending_readonly_user')
        password = ssm_secrets.get('lending_readonly_pass')
        hostname = ssm_secrets.get('lending_readonly_host')
        port_number = ssm_secrets.get('lending_readonly_port')
        databasename = ssm_secrets.get(database)
        engine_link = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(username, password, hostname, port_number, databasename)
        engine = create_engine(engine_link)

    elif database == 'UNIFIED_READ_ONLY_DB':
        ssm_keys = ['UNIFIED_READ_ONLY_DB', 'UNIFIED_READ_ONLY_USER', 'UNIFIED_READ_ONLY_PASSWORD', 'UNIFIED_READ_ONLY_HOST', 'UNIFIED_READ_ONLY_PORT']
        ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
        for ssm_parameter in ssm_parameters:
            ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
        username = ssm_secrets.get('UNIFIED_READ_ONLY_USER')
        password = ssm_secrets.get('UNIFIED_READ_ONLY_PASSWORD')
        hostname = ssm_secrets.get('UNIFIED_READ_ONLY_HOST')
        port_number = ssm_secrets.get('UNIFIED_READ_ONLY_PORT')
        databasename = ssm_secrets.get(database)
        engine_link = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(username, password, hostname, port_number, databasename)
        engine = create_engine(engine_link)

    elif database == 'finbox_dashboard':
        ssm_keys = ['PORTAL_READONLY_USER', 'PORTAL_READONLY_PASSWORD', 'PORTAL_READONLY_HOST']
        ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
        ssm_secrets = dict()
        for ssm_parameter in ssm_parameters:
            ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
        username = ssm_secrets.get('PORTAL_READONLY_USER')
        password = ssm_secrets.get('PORTAL_READONLY_PASSWORD')
        hostname = ssm_secrets.get('PORTAL_READONLY_HOST')
        databasename = database
        engine_link = 'postgresql+psycopg2://{}:{}@{}/{}'.format(username, password, hostname, databasename)
        engine = create_engine(engine_link)

    elif database == 'finbox_ds_bi_db_name':
        ssm_keys = ['finbox_ds_bi_db_name', 'finbox_ds_bi_ro_user', 'finbox_ds_bi_ro_password', 'finbox_ds_bi_host', 'finbox_ds_bi_port']
        ssm_parameters = boto3.client('ssm', region_name='ap-south-1').get_parameters(Names=ssm_keys, WithDecryption=True)['Parameters']
        ssm_secrets = dict()
        for ssm_parameter in ssm_parameters:
            ssm_secrets[ssm_parameter['Name']] = ssm_parameter['Value']
        username = password = ssm_secrets.get('finbox_ds_bi_ro_user')
        password = ssm_secrets.get('finbox_ds_bi_ro_password')
        hostname = ssm_secrets.get('finbox_ds_bi_host')
        port_number = ssm_secrets.get('finbox_ds_bi_port')
        databasename = ssm_secrets.get(database)
        engine_link = 'postgresql+psycopg2://{}:{}@{}:{}/{}'.format(username, password, hostname, port_number, databasename)
        engine = create_engine(engine_link)

    return engine


engine_dw2 = get_engine('prod_redshift_name')
engine_lending = get_engine('lending_readonly_dbname')
engine_unified = get_engine_unified('UNIFIED_READ_ONLY_DB')
engine_bank_connect = get_engine_banking('finbox_dashboard')
engine_ds_bi = get_engine('finbox_ds_bi_db_name')

In [129]:
df_pivot.to_sql('test_cnt_apps',if_exists = 'append', con=engine_ds_bi,  index=False)

In [130]:
dfnew=pd.read_sql("""select  count(*) from test_cnt_apps""", engine_ds_bi)
dfnew

Unnamed: 0,count
0,33
