In [1]:
import os
import logging
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError

import pandas as pd
from dotenv import load_dotenv
from io import StringIO
import pandas as pd
import numpy as np
import json
import logging
import io
import csv
logger = logging.getLogger(__name__)

In [2]:
import sys
sys.path.append("/Users/zemariatrindade/BTS/Financy_App/Scripts_and_Data/venv/lib/python3.11/site-packages")

import psycopg2

#### loading variables from .env file

In [3]:
load_dotenv() 

aws_access_key_id = os.getenv("aws_access_key_id")
aws_secret_access_key = os.getenv("aws_secret_access_key")
aws_session_token = os.getenv("aws_session_token")
s3_bucket = "financy"
s3_prefix_path = "egdar_data/raw/"

s3 = boto3.client("s3",
                      aws_access_key_id=aws_access_key_id,
                      aws_secret_access_key=aws_secret_access_key,
                      aws_session_token=aws_session_token)


db_username = os.getenv('db_username')
db_password = os.getenv("db_password")
db_host = os.getenv("db_host")
db_port = os.getenv("db_port")
db_name = os.getenv("db_name")

#### 1st OPTION : Get results based on a list of tickers

In [4]:
anomaly_tickers_list = pd.read_csv("companies_with_anomalies.csv")["Ticker"].unique().tolist()
# TO DO: Move all the files to the 2024-05-31/ folder

In [6]:
def process_json(list_of_tickers):
    list_of_keys = ["egdar_data/raw/2024-05-31/" + ticker + ".json" for ticker in list_of_tickers]
    all_records = []  # we get a list of dicts with all the features possible
    missing_files = []  # List to keep track of missing files
    processed_tickers = []
    
    # List the objects in the bucket
    existing_keys = {obj['Key'] for obj in s3.list_objects_v2(Bucket=s3_bucket, Prefix="egdar_data/raw/2024-05-31/").get('Contents', [])}

    for key in list_of_keys:
        # Split the string by '/' and take the last part
        file_name = key.split('/')[-1]
        # Split the last part by '.' and take the first part
        ticker = file_name.split('.')[0]

        if key not in existing_keys:
            missing_files.append(ticker)
            continue
        else:
            processed_tickers.append(ticker)
            

            # Download JSON file from S3
            response = s3.get_object(Bucket=s3_bucket, Key=key)
            compressed_data = response["Body"].read()  # not human readable data
                
            # Decompress the data - in our case the json werent compressed as gzip
            #with gzip.GzipFile(fileobj=BytesIO(compressed_data), mode="rb") as gz:
             #   decompressed_content = gz.read()  # string dictionary
                
            # Convert the decompressed content to a dictionary
            data = json.loads(compressed_data.decode("utf-8"))  # parsing json content to a dictionary
                    
            # Check if 'facts' key is present
            if 'facts' not in data:
                print(f"No 'facts' key for {ticker}")
                continue
    
            # Getting the set of units available
            reporting_types = list(data["facts"].keys())
    
            for reporting_type in reporting_types:  # iterating through reporting types
                # Getting the set of units available
                accounts = list(data["facts"][reporting_type].keys())
                
                list1 = []
                for account in accounts:
                    list1 += list(data["facts"][reporting_type][account]["units"].keys())
                
                units_list = list(set(list1))
    
                # Remember the data is organized by accounts. So for each account (type) we have a lot of records
                for acc_name, acc_values in data['facts'][reporting_type].items():  # iterating through accounts
                    for unit in units_list:  # iterating through units
                        r_data_list = acc_values.get('units', {}).get(unit, {})
                        for record in r_data_list:  # iterating through a list of dicts
                            record["reporting_type"] = reporting_type
                            record["units"] = unit
                            record["type"] = acc_name
                            record["ticker"] = ticker
                            all_records.append(record)

    # Select only relevant columns
    end_list = []
    for record in all_records:
        dict1 = {}
        dict1["end"] = record.get("end", {})
        dict1["ticker"] = record.get("ticker", {})
        dict1["reporting_type"] = record.get("reporting_type", {})
        dict1["form"] = record.get("form", {})
        dict1["type"] = record.get("type", {})
        dict1["units"] = record.get("units", {})
        dict1["val"] = record.get("val", {})
        end_list.append(dict1)
    
    print(f"\nTickers processed: {processed_tickers}")
    print("\nMissing files:", missing_files)
    return end_list

**Insight**
- There are a lot of tickers that do not have "us-gaap" as their reporting system.
- Besides the US-GAAP reporting system we have dei, ifrs-full

In [7]:
def delete_table_inRDS(db_name, db_username, db_password, db_host, db_port):

    try:
        conn = psycopg2.connect(dbname = db_name, user = db_username, password = db_password, host = db_host, port = db_port)
        cursor = conn.cursor()

        sql_query = """
            DROP TABLE edgar_selected_tickers_table;
                    """

        # Create a new db
        cursor.execute(sql_query)

        print("\nTable edgar_selected_tickers_table was deleted........\n")

        conn.commit()

        # Close the cursor and connection
        cursor.close()
        conn.close()
    except psycopg2.Error as e:
        print("Error:", e)

In [8]:
def create_table_inRDS(db_name, db_username, db_password, db_host, db_port):

    try:
        conn = psycopg2.connect(dbname = db_name, user = db_username, password = db_password, host = db_host, port = db_port)
        cursor = conn.cursor()

        sql_query = """
            CREATE TABLE IF NOT EXISTS edgar_selected_tickers_table (
            end_date DATE,
            ticker TEXT,
            reporting_type TEXT,
            form TEXT,
            type TEXT,
            units TEXT,
            val REAL
        );
                    """

        # Create a new db
        cursor.execute(sql_query)

        print("\nTable edgar_selected_tickers_table was/is created........\n")

        conn.commit()

        # Close the cursor and connection
        cursor.close()
        conn.close()

    except psycopg2.Error as e:
        print("Error:", e)

In [9]:
# Receives a list of dictionaries in bulk and loads them into aws postgres rds
def load_batch_into_database(batch_data, db_name, db_username, db_password, db_host, db_port):
    try:

        conn = psycopg2.connect(dbname = db_name, user = db_username, password = db_password, host = db_host, port = db_port)
        cursor = conn.cursor()

        # Convert list of dictionaries to CSV format
        csv_buffer = io.StringIO()
        csv_writer = csv.DictWriter(csv_buffer, fieldnames=batch_data[0].keys())
        csv_writer.writeheader()
        for record in batch_data:
            # Replace empty values with None
            record = {key: value if value != '' else None for key, value in record.items()}
            csv_writer.writerow(record)
        csv_buffer.seek(0)

        # Load CSV data into PostgreSQL database using cursor.copy_expert()
        copy_sql = "COPY edgar_selected_tickers_table FROM STDIN WITH CSV HEADER DELIMITER ',' NULL ''"
        cursor.copy_expert(copy_sql, csv_buffer)

        # Commit changes and close cursor and connection
        conn.commit()
        cursor.close()
        conn.close()

        print("\nbatch data was loaded with success! \n")

    except psycopg2.Error as e:
        print("Error:", e)

In [9]:
data_to_load = process_json(anomaly_tickers_list)
delete_table_inRDS(db_name, db_username, db_password, db_host, db_port)
create_table_inRDS(db_name, db_username, db_password, db_host, db_port)
load_batch_into_database(data_to_load, db_name, db_username, db_password, db_host, db_port)


Tickers processed: ['AIG', 'AMNA', 'AMND', 'AMTR', 'AMUB', 'BDCX', 'BDCZ', 'BMY', 'AMJ', 'AAPL', 'BAC', 'BAC-PL', 'BAC-PM', 'BAC-PN', 'BAC-PO', 'BAC-PP', 'BAC-PQ', 'BAC-PS', 'BACRP', 'AMZN', 'AMD', 'BJRI']

Missing files: ['AIG-PA', 'FNMA', 'FNMAS', 'FNMAH', 'FNMFN', 'FNMAJ', 'FNMAM', 'FNMAN', 'FNMAI', 'FNMAT', 'FNMAK', 'FNMAL', 'FNMFM', 'FNMAO', 'FNMAG', 'FNMFO', 'FNMAP', 'LUMN', 'WFC', 'WFC-PY', 'WFC-PR', 'WFC-PL', 'WFC-PC', 'WFC-PD', 'WFC-PZ', 'WFCNP', 'WFC-PA', 'DB', 'AGATF', 'DEENF', 'ADZCF', 'OLOXF', 'DGP', 'DGZ', 'DZZ', 'USML', 'WUCT', 'UCIB', 'PFFL', 'QULL', 'SCDL', 'SMHB', 'MLPR', 'MTUL', 'MVRL', 'IWML', 'MLPB', 'HDLB', 'IFED', 'IWDL', 'IWFL', 'ESUS', 'FBGX', 'FEDL', 'CEFD', 'DJCB', 'BMYMP', 'CELG-RI', 'C', 'C-PJ', 'C-PN', 'JPM', 'JPM-PC', 'JPM-PD', 'JPM-PJ', 'JPM-PK', 'JPM-PL', 'JPM-PM', 'GSK', 'GLAXF', 'ATMP', 'AYTEF', 'BALTF', 'BWVTF', 'COWTF', 'VXX', 'VXZ', 'PGMFF', 'SGGFF', 'JJTFF', 'JJUFF', 'PGDDF', 'JJGTF', 'JJMTF', 'JJNTF', 'JJOFF', 'JJPFF', 'JJSSF', 'ICITF', 'JEMTF',

##### 2ND OPTION : Get results based on number of tickers

In [None]:
# Returns a list of json file tags from s3 bucket - This is to be used only for the 2nd option
def get_response_from_s3(s3, s3_bucket, s3_prefix_path):

    try:

        # List objects with the specified prefix
        response = s3.list_objects(Bucket=s3_bucket, Prefix=s3_prefix_path)  # big json file
        response_contents = response.get("Contents", [])  # accessing the list "Contents"

        return response_contents

    except Exception as e:
        logger.exception(f"An unexpected error occurred: {e}")

In [None]:
number_of_tickers = 1

objs = get_response_from_s3(s3, s3_bucket, s3_prefix_path)[:number_of_tickers]

all_records = [] # we get a list of dicts with all the features possible


for obj in objs:
    # Extract and print the object keys
    file_key = obj["Key"]
    
    # Split the string by '/' and take the last part
    file_name = file_key.split('/')[-1]
    # Split the last part by '.' and take the first part
    ticker = file_name.split('.')[0]
    
    # Download JSON file
    response = s3.get_object(Bucket=s3_bucket, Key=file_key)  # response metadata
    compressed_data = response["Body"].read()  # not human readable data
    
    # Decompress the data
    #with gzip.GzipFile(fileobj=BytesIO(compressed_data), mode="rb") as gz:
     #   decompressed_content = gz.read()  # string dictionary
    
    # Convert the decompressed content to a dictionary
    data = json.loads(compressed_data.decode("utf-8"))  # parsing json content to a dictionary


    
    # Getting the set of units available
    accounts = list(data["facts"]["us-gaap"].keys())
    
    list1 = []
    for account in accounts:
        list1 += list(data["facts"]["us-gaap"][account]["units"].keys())
    
    units_list = list(set(list1))


    # Check if 'facts' and 'us-gaap' keys are present
    if 'facts' not in data or 'us-gaap' not in data['facts']:
        print(f"No 'us-gaap' key for {ticker}")
    
    
    # Remember the data is organzized by accounts. so for each account(type) we have a lot records
    for acc_name, acc_values in data['facts']['us-gaap'].items(): # iterating through accounts
        for unit in units_list: # iterating through units
            r_data_list = acc_values.get('units', {}).get(unit, {})
            for record in r_data_list: # iterating through a list of dicts
                record["units"]= unit
                record["type"] = acc_name
                record["ticker"] = ticker
                all_records.append(record)
