In [None]:
# List of companies
# 0C000008MR, 0C00000AXM, 0C00000BUG, 0C000007R3, 0C000009JZ

# SHARECLASS
# 0P0000A5G3, 0P00006WAE, 0P000002H5, 0P00007O7R, 0P000003PC

# Need to install 'pyathena' in current docker image

## 1. User Inputs

In [28]:
user_shareclassid = ['0P0000A5G3', '0P00007O7R', '0P000003PC']
user_start_date = '2002-05-06'
user_end_date = '2007-05-07'

dates = (user_start_date, user_end_date)

In [29]:
import os
from pyspark.sql import SparkSession
import pyathena
import pyspark.sql.functions as F
import pandas as pd
pd.set_option('display.max_columns', 100)

In [30]:
spark=SparkSession.builder.master('local').\
        appName('Raw_data_analysis').\
        config('spark.driver.memory','20G').\
        config("spark.executor.memory","5gb").\
        config("spark.cores.max","6").getOrCreate()

In [6]:
%run /Morningstar/amazon_adfs/aws-sts-forms-v2.py

Morningstar Domain Username (root): schava3
Federating through ADFS Prod using script version 2.0.5
Username: schava3@morningstar.com
Region: us-east-1
Profile: default
Keyring: OS error: No module named 'keyring'
········
Please choose the role you would like to assume:
0) quant-non-prod-operator (809853243180)
1) quant-prod-readonly (763485997594)
2) apg-non-prod-operator (957772357715)
3) bit-non-prod-dev-general (309603612345)
4) bit-prod-production (156286860437)
5) bit-prod-uat (156286860437)
Role Integer eg. 0,1,2,all: 0
----------------------------------------------------------------
Keys saved under profile: [ default ] 
Keys File: /root/.aws/credentials
Sessions Expiration: 2020-09-28 15:47:33+00:00. (12 hour)
After this time, you may safely rerun this script to refresh your access key pair.
To use this credential, create an AWS sessionin the CLI using the profile name default
Examples:
  Boto3: session = boto3.session.Session(profile_name='default')
  AWScli: $ aws --profile

  config.readfp(open(filename))


## 2. Settings for connecting to athena database

In [31]:
db_name = 'rm_2020_08_18_prod_glb_eq_usd_fof'

conn = pyathena.connect(s3_staging_dir='s3://risk-model-2.0-temp', region_name='us-east-1')

timeindex_query = pd.read_sql(f"""select timeindex from {db_name}.riskmodel__timeindex where \
            modeldate between date '{user_start_date}' and date '{user_end_date}' order by timeindex""", conn)

ti = [timeindex_query['timeindex'].iloc[0], timeindex_query['timeindex'].iloc[-1]]

In [32]:
r_securityid_query=pd.read_sql("select * from {}.riskmodel__security_map where shareclassid in \
                               ({})".format(db_name, ",".join("'{0}'".format(i) for i in user_shareclassid)), conn)

In [33]:
user_r_securityid = list(r_securityid_query['r_securityid'].unique())

In [39]:
import glob
files = glob.glob("/Morningstar/audit_trail/list_of_tables/*")
for f in files:
    os.remove(f)

## 3. Final filters of user input 

In [40]:
for root, dirs, files in os.walk("/Morningstar/risk-model-data/tmp/fakemodelid/", topdown=False):
    for name in dirs:
        if name not in ['buildpremia', 'etl']:
            raw_spark_df = spark.read.parquet(os.path.join(root, name))
            
            if 'SHARECLASSID' in raw_spark_df.columns:

                filter_company = raw_spark_df.filter((F.col('SHARECLASSID').isin(user_shareclassid)))
                
            elif 'R_SECURITYID' in raw_spark_df.columns:
                
                filter_company = raw_spark_df.filter(F.col('R_SECURITYID').isin(user_r_securityid))
                
            if 'EFFECTIVEDATE' in filter_company.columns:

                filter_dates = filter_company.where(F.col('EFFECTIVEDATE').between(*dates))

                final_data = filter_dates.sort(F.col('EFFECTIVEDATE'), ascending=False)

            elif 'TIMEINDEX' in filter_company.columns:
                
                filter_dates = filter_company.filter((F.col('TIMEINDEX') >= int(ti[0])) & (F.col('TIMEINDEX') <= int(ti[1])))

                final_data = filter_dates.sort(F.col('TIMEINDEX'), ascending=False)
            
            final_data.toPandas().to_csv("/Morningstar/audit_trail/list_of_tables/{0}.csv".format(name), index=False)
            