
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0)                                |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer                       |

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
import datetime
from pyspark.sql.functions import col, when, count, isnan, concat
from pyspark.sql.types import DoubleType
end_date = datetime.date.today() + datetime.timedelta(days = 1)

In [None]:
# Script generated for node Amazon S3
pools_yield_dyf = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="json",
    connection_options={
        "paths": [f"s3://dataspartan-test-bucket/defillama-analytics/raw-data/pools-yield/results/llama_pools_yield_{end_date}.json"],
        "recurse": False,
    },
)

In [None]:
# pools_yield_dyf.printSchema()
pools_yield_dyfc = pools_yield_dyf.resolveChoice(choice="make_cols").relationalize("root", "s3://dataspartan-test-bucket/unspecified/temp-dir/")
pools_yield_dyfc_root_pools_yield = pools_yield_dyfc.select('root_pools_yield')
pools_yield_dyf_root_pools_yield = pools_yield_dyfc_root_pools_yield.toDF()
new_cols=(column.replace('.', '_') for column in pools_yield_dyf_root_pools_yield.columns)
pools_yield_dyf_root_pools_yield = pools_yield_dyf_root_pools_yield.toDF(*new_cols)

In [None]:
remainCols = ['id', 'index', 'pools_yield_val_chain', 'pools_yield_val_project', 'pools_yield_val_symbol', 'pools_yield_val_pool', 'pools_yield_val_stablecoin', 'pools_yield_val_ilRisk', 'pools_yield_val_exposure', 'pools_yield_val_predictions_predictedClass', 'pools_yield_val_predictions_binnedConfidence', 'pools_yield_val_poolMeta', 'pools_yield_val_count', 'pools_yield_val_outlier', 'pools_yield_val_rewardTokens', 'pools_yield_val_underlyingTokens']
fillnaCols = {'pools_yield_val_tvlUsd_long':'0','pools_yield_val_tvlUsd_int':'0','pools_yield_val_apyBase_double':'0', 'pools_yield_val_apyBase_int':'0','pools_yield_val_apyReward_double':'0', 'pools_yield_val_apyReward_int':'0','pools_yield_val_apy_double':'0', 'pools_yield_val_apy_int':'0','pools_yield_val_apyPct1D_double':'0', 'pools_yield_val_apyPct1D_int':'0','pools_yield_val_apyPct7D_double':'0', 'pools_yield_val_apyPct7D_int':'0','pools_yield_val_apyPct30D_double':'0', 'pools_yield_val_apyPct30D_int':'0','pools_yield_val_predictions_predictedProbability_double':'0', 'pools_yield_val_predictions_predictedProbability_int':'0','pools_yield_val_mu_double':'0','pools_yield_val_mu_int':'0','pools_yield_val_sigma_double':'0','pools_yield_val_sigma_int':'0'}
tvlCols = ['pools_yield_val_tvlUsd_long','pools_yield_val_tvlUsd_int', 'pools_yield_val_tvlUsd']
apyBaseCols = ['pools_yield_val_apyBase_double', 'pools_yield_val_apyBase_int', 'pools_yield_val_apyBase']
apyRewardCols = ['pools_yield_val_apyReward_double', 'pools_yield_val_apyReward_int', 'pools_yield_val_apyReward']
apyCols = ['pools_yield_val_apy_double', 'pools_yield_val_apy_int', 'pools_yield_val_apy']
apypct1dCols = ['pools_yield_val_apyPct1D_double', 'pools_yield_val_apyPct1D_int', 'pools_yield_val_apyPct1D']
apypct7dCols = ['pools_yield_val_apyPct7D_double', 'pools_yield_val_apyPct7D_int', 'pools_yield_val_apyPct7D']
apypct30dCols = ['pools_yield_val_apyPct30D_double', 'pools_yield_val_apyPct30D_int', 'pools_yield_val_apyPct30D']
predprobCols = ['pools_yield_val_predictions_predictedProbability_double', 'pools_yield_val_predictions_predictedProbability_int', 'pools_yield_val_predictions_predictedProbability']
muCols = ['pools_yield_val_mu_double','pools_yield_val_mu_int', 'pools_yield_val_mu']
sigmaCols = ['pools_yield_val_sigma_double','pools_yield_val_sigma_int', 'pools_yield_val_sigma']

In [None]:
pools_yield_dyf_root_pools_yield2 = pools_yield_dyf_root_pools_yield.fillna(fillnaCols).select(
    ((col(tvlCols[0]) + col(tvlCols[1]))).alias(tvlCols[2]),
    ((col(apyBaseCols[0]) + col(apyBaseCols[1]))/100).alias(apyBaseCols[2]),
    ((col(apyRewardCols[0]) + col(apyRewardCols[1]))/100).alias(apyRewardCols[2]),
    ((col(apyCols[0]) + col(apyCols[1]))/100).alias(apyCols[2]),
    ((col(apypct1dCols[0]) + col(apypct1dCols[1]))/100).alias(apypct1dCols[2]),
    ((col(apypct7dCols[0]) + col(apypct7dCols[1]))/100).alias(apypct7dCols[2]),
    ((col(apypct30dCols[0]) + col(apypct30dCols[1]))/100).alias(apypct30dCols[2]),
    ((col(predprobCols[0]) + col(predprobCols[1]))/100).alias(predprobCols[2]),
    ((col(muCols[0]) + col(muCols[1]))/100).alias(muCols[2]),
    ((col(sigmaCols[0]) + col(sigmaCols[1]))/100).alias(sigmaCols[2]),
    *remainCols,
)

In [None]:
def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

In [None]:
pools_yield_dyf_root_pools_yield2 = pools_yield_dyf_root_pools_yield2.withColumn("pools_yield_val_predictions_predictedClass", blank_as_null("pools_yield_val_predictions_predictedClass"))

In [None]:
#https://medium.com/geekculture/aws-glue-simple-job-to-write-part-csv-files-to-a-single-file-d805eddbe641
pools_yield_dyf_root_pools_yield2.repartition(1).write.mode('overwrite').csv(f's3://dataspartan-test-bucket/defillama-analytics/raw-data/pools-yield/unnested/dump/root_pools_yield_{end_date}',header = 'true')

In [None]:
import boto3
client = boto3.client('s3')
BUCKET_NAME= 'dataspartan-test-bucket'
PREFIX_start ='defillama-analytics/raw-data/pools-yield/unnested/'
PREFIX_end =f'/root_pools_yield_{end_date}'
response = client.list_objects(
    Bucket=BUCKET_NAME, Prefix=PREFIX_start+'dump'+PREFIX_end,)
#Helps to fetch the file name of the file created in this glue job
name = response['Contents'][0]['Key']
client.copy_object(Bucket=BUCKET_NAME, 
#The csv file generated can be renamed with a name as desired !
CopySource=BUCKET_NAME+'/'+name, Key=PREFIX_start+f'root_pools_yield/root_pools_yield_single_file.csv')
client.copy_object(Bucket='dataspartan-google-drive', 
#The csv file generated can be renamed with a name as desired !
CopySource=BUCKET_NAME+'/'+name, Key='root_pools_yield_single_file.csv')
# client.delete_object(Bucket=BUCKET_NAME ,Key=name)