In [1]:
import os
import sys
import base64
import json
import boto3
from botocore.exceptions import ClientError
import requests
import pandas as pd
from datetime import datetime, timedelta
from flatten_json import flatten

sys.path.append(os.path.abspath("../"))
from src.common.dynamo_custom_functions import apply_schema
from src.common.dynamo_custom_functions import get_actual_dtypes

In [2]:
# aws sso login --profile beta
os.environ.setdefault("AWS_PROFILE", "prod")
session = boto3.Session(profile_name="prod")

In [3]:
def scan_dynamodb_table(table: str, columns: list = None) -> list:
    """
    Scans a DynamoDB table and retrieves only the specified columns.
    If no columns are specified, retrieves all columns.

    Args:
        table (str): The name of the DynamoDB table to scan.
        columns (list, optional): A list of column names (attributes) to retrieve in the scan.
                                  If None or empty, retrieves all columns.

    Returns:
        list: A list of items retrieved from the table, each containing only the specified columns,
              or all columns if none are specified.
    """
    client = boto3.client("dynamodb")

    scan_kwargs = {"TableName": table}

    if columns:
        # Define placeholders for reserved words in columns
        expression_attribute_names = {}
        projection_expression = []

        # Iterate through columns and replace any reserved keywords
        for column in columns:
            if column.lower() in [
                "name",
                "date",
                "value",
                "type",
                "year",
            ]:  # Add more reserved keywords as needed
                placeholder = f"#{column}"
                expression_attribute_names[placeholder] = column
                projection_expression.append(placeholder)
            else:
                projection_expression.append(column)

        scan_kwargs["ProjectionExpression"] = ",".join(projection_expression)
        if expression_attribute_names:
            scan_kwargs["ExpressionAttributeNames"] = expression_attribute_names

    data = []
    done = False
    start_key = None

    while not done:
        if start_key:
            scan_kwargs["ExclusiveStartKey"] = start_key

        print(f"Scanning DynamoDB table: {table}")

        response = client.scan(**scan_kwargs)
        data.extend(response.get("Items", []))  # Append items to data list

        # Check if there's more data to scan
        start_key = response.get("LastEvaluatedKey")
        done = start_key is None

    return data

In [4]:
def scan_dynamodb_table_two_days_ago(table: str, columns: list = None) -> list:
    """
    Scans a DynamoDB table and retrieves only the specified columns.
    If no columns are specified, retrieves all columns.
    Filters the results to get only the items updated in the last two days.

    Args:
        table (str): The name of the DynamoDB table to scan.
        columns (list, optional): A list of column names (attributes) to retrieve in the scan.
                                  If None or empty, retrieves all columns.

    Returns:
        list: A list of items retrieved from the table, each containing only the specified columns,
              or all columns if none are specified.
    """
    client = boto3.client("dynamodb")

    # Calculate the timestamp for two days ago
    two_days_ago = datetime.utcnow() - timedelta(days=2)
    two_days_ago_iso = two_days_ago.strftime("%Y-%m-%dT%H:%M:%SZ")

    scan_kwargs = {
        "TableName": table,
        "FilterExpression": "updatedAt >= :two_days_ago",
        "ExpressionAttributeValues": {":two_days_ago": {"S": two_days_ago_iso}},
    }

    if columns:
        # Define placeholders for reserved words in columns
        expression_attribute_names = {}
        projection_expression = []

        # Iterate through columns and replace any reserved keywords
        for column in columns:
            if column.lower() in [
                "name",
                "date",
                "value",
                "type",
                "year",
            ]:  # Add more reserved keywords as needed
                placeholder = f"#{column}"
                expression_attribute_names[placeholder] = column
                projection_expression.append(placeholder)
            else:
                projection_expression.append(column)

        scan_kwargs["ProjectionExpression"] = ",".join(projection_expression)
        if expression_attribute_names:
            scan_kwargs["ExpressionAttributeNames"] = expression_attribute_names

    data = []
    done = False
    start_key = None

    while not done:
        if start_key:
            scan_kwargs["ExclusiveStartKey"] = start_key

        print(f"Scanning DynamoDB table: {table}")

        response = client.scan(**scan_kwargs)
        data.extend(response.get("Items", []))  # Append items to data list

        # Check if there's more data to scan
        start_key = response.get("LastEvaluatedKey")
        done = start_key is None

    return data

In [5]:
def convert_to_dataframe(data):
    # Helper function to extract the actual value based on the key type
    def extract_value(d):
        if "S" in d:
            return d["S"]
        elif "N" in d:
            return float(d["N"])  # Convert numeric values to float
        elif "BOOL" in d:
            return d["BOOL"]
        return None  # Handle unexpected cases

    # Transform the list of records into a format suitable for DataFrame
    transformed_data = []
    for record in data:
        transformed_record = {
            key: extract_value(value) for key, value in record.items()
        }
        transformed_data.append(transformed_record)

    # Convert the transformed data to a DataFrame
    df = pd.DataFrame(transformed_data)
    return df

In [6]:
# Define a function to safely convert the timestamp
def safe_convert_timestamp(x):
    try:
        # Convert to datetime if within a reasonable range
        if 0 < x < 4102444800:  # This is up to year 2100
            return datetime.fromtimestamp(x).strftime("%Y-%m-%d")
        else:
            return None  # Return None for out-of-range values
    except Exception:
        return None

# final_df["updated_at"] = final_df["updated_at"] / 1000
# # Apply the conversion function
# final_df["updated_at"] = final_df["updated_at"].apply(safe_convert_timestamp)

# print(final_df["updated_at"])

In [7]:
data = scan_dynamodb_table(
    "sls-ddb-investments-sell"
)
data

Scanning DynamoDB table: sls-ddb-investments-sell


[{'encodedKey': {'S': '8a5e2d14836856a4018369873b2802e0'},
  'providerAccountId': {'S': '2'},
  'expectedSettlementDate': {'S': '2023-03-15T00:24:56.957Z'},
  'orderedAt': {'S': '2023-03-05T00:24:56.957Z'},
  'orderId': {'S': '21D94940E277F5282'},
  'status': {'S': 'AwaitingOrderConfirmation'},
  'tenantId': {'S': 'nomo'},
  'trace': {'L': [{'S': 'PlaceOrderCreated'},
    {'S': 'AllFundsAccountCreated'},
    {'S': 'PlaceOrderSuccess'},
    {'S': 'AwaitingOrderConfirmation'}]},
  'createdAt': {'N': '1677975896975'},
  'pricePerShareOnSell': {'M': {'value': {'N': '134.736577'},
    'currency': {'S': 'USD'}}},
  'providerSellId': {'S': '29'},
  'updatedAt': {'N': '1677975900474'},
  'userId': {'S': 'w1bzb16fXndSPP1iKMDJNc'},
  'amount': {'M': {'value': {'N': '0'}, 'currency': {'S': 'USD'}}},
  'provider': {'S': 'allFunds'},
  'providerInvestmentId': {'S': '218413'},
  'id': {'S': '0ee43b7f-0405-45b6-bf73-d2b7f4c8ef7f'},
  'traceId': {'S': 'deec6923-27ca-4aa9-9c29-7c7109ff1096'},
  'shares

In [8]:
frames = []
for record in data:
    flat_json = flatten(record)
    frames.append(flat_json)

In [9]:
frames

[{'encodedKey_S': '8a5e2d14836856a4018369873b2802e0',
  'providerAccountId_S': '2',
  'expectedSettlementDate_S': '2023-03-15T00:24:56.957Z',
  'orderedAt_S': '2023-03-05T00:24:56.957Z',
  'orderId_S': '21D94940E277F5282',
  'status_S': 'AwaitingOrderConfirmation',
  'tenantId_S': 'nomo',
  'trace_L_0_S': 'PlaceOrderCreated',
  'trace_L_1_S': 'AllFundsAccountCreated',
  'trace_L_2_S': 'PlaceOrderSuccess',
  'trace_L_3_S': 'AwaitingOrderConfirmation',
  'createdAt_N': '1677975896975',
  'pricePerShareOnSell_M_value_N': '134.736577',
  'pricePerShareOnSell_M_currency_S': 'USD',
  'providerSellId_S': '29',
  'updatedAt_N': '1677975900474',
  'userId_S': 'w1bzb16fXndSPP1iKMDJNc',
  'amount_M_value_N': '0',
  'amount_M_currency_S': 'USD',
  'provider_S': 'allFunds',
  'providerInvestmentId_S': '218413',
  'id_S': '0ee43b7f-0405-45b6-bf73-d2b7f4c8ef7f',
  'traceId_S': 'deec6923-27ca-4aa9-9c29-7c7109ff1096',
  'shares_N': '0.036881'},
 {'encodedKey_S': '8a5e019f8362c19901836986433407d1',
  'p

In [10]:
final_df = pd.DataFrame(frames)

In [11]:
final_df.columns

Index(['encodedKey_S', 'providerAccountId_S', 'expectedSettlementDate_S',
       'orderedAt_S', 'orderId_S', 'status_S', 'tenantId_S', 'trace_L_0_S',
       'trace_L_1_S', 'trace_L_2_S', 'trace_L_3_S', 'createdAt_N',
       'pricePerShareOnSell_M_value_N', 'pricePerShareOnSell_M_currency_S',
       'providerSellId_S', 'updatedAt_N', 'userId_S', 'amount_M_value_N',
       'amount_M_currency_S', 'provider_S', 'providerInvestmentId_S', 'id_S',
       'traceId_S', 'shares_N', 'settlementDate_S', 'settlementShares_N',
       'trace_L_0_M_createdAt_S', 'trace_L_0_M_status_S',
       'trace_L_1_M_createdAt_S', 'trace_L_1_M_status_S',
       'trace_L_2_M_createdAt_S', 'trace_L_2_M_status_S',
       'trace_L_3_M_createdAt_S', 'trace_L_3_M_status_S', 'investmentName_S',
       'settlementCurrency_S', 'confirmedAt_S',
       'confirmation_M_generalOperationData_M_confirmationDate_S',
       'confirmation_M_generalOperationData_M_ccc_S',
       'confirmation_M_generalOperationData_M_calculatedCros

In [12]:
# get schema and dataframe for write to athena
athena_schema = get_actual_dtypes(final_df)

In [13]:
athena_schema

{'encodedKey_S': 'string',
 'providerAccountId_S': 'int',
 'expectedSettlementDate_S': 'timestamp',
 'orderedAt_S': 'timestamp',
 'orderId_S': 'string',
 'status_S': 'string',
 'tenantId_S': 'string',
 'trace_L_0_S': 'string',
 'trace_L_1_S': 'string',
 'trace_L_2_S': 'string',
 'trace_L_3_S': 'string',
 'createdAt_N': 'bigint',
 'pricePerShareOnSell_M_value_N': 'double',
 'pricePerShareOnSell_M_currency_S': 'string',
 'providerSellId_S': 'int',
 'updatedAt_N': 'string',
 'userId_S': 'string',
 'amount_M_value_N': 'double',
 'amount_M_currency_S': 'string',
 'provider_S': 'string',
 'providerInvestmentId_S': 'int',
 'id_S': 'string',
 'traceId_S': 'string',
 'shares_N': 'double',
 'settlementDate_S': 'string',
 'settlementShares_N': 'double',
 'trace_L_0_M_createdAt_S': 'timestamp',
 'trace_L_0_M_status_S': 'string',
 'trace_L_1_M_createdAt_S': 'timestamp',
 'trace_L_1_M_status_S': 'string',
 'trace_L_2_M_createdAt_S': 'timestamp',
 'trace_L_2_M_status_S': 'string',
 'trace_L_3_M_creat