# Insert historical Syntage data to DB

This notebook reads data from the Syntage API and... Then, it creates a table within a designated Postgres database and inserts the newest data.


In [1]:
# Import libraries

import datetime
import pprint
import numpy as np
import os
import pandas as pd
import pytz
import sqlalchemy
from IPython.display import display, HTML
from urllib.parse import quote_plus
from dotenv import load_dotenv
load_dotenv()

True

## API call to pull data for all entities


In [2]:
import http.client

SYNTAGE_API_KEY = os.getenv('SYNTAGE_API_KEY')

conn = http.client.HTTPSConnection("api.syntage.com")
payload = ''
headers = {
  'Accept': 'application/ld+json',
  'X-API-Key': SYNTAGE_API_KEY
}
conn.request("GET", "/entities?order%5BcreatedAt%5D=desc&itemsPerPage=1000", payload, headers)
res = conn.getresponse()
data = res.read()
pprint.pp(data.decode("utf-8"))

('{"@context":"\\/contexts\\/Link","@id":"\\/entities","@type":"hydra:Collection","hydra:totalItems":21,"hydra:member":[{"@id":"\\/entities\\/9d2ac062-742c-4942-b7a7-8e9dbf5ca9cd","@type":"Link","id":"9d2ac062-742c-4942-b7a7-8e9dbf5ca9cd","type":"company","name":"AGRITEC '
 'DEL '
 'CENTRO","addedBy":null,"owner":null,"taxpayer":{"@id":"\\/taxpayers\\/ACS921021HJ2","@type":"Taxpayer","id":"ACS921021HJ2","personType":"legal","registrationDate":"1992-10-21 '
 '00:00:00","name":"AGRITEC DEL CENTRO","createdAt":"2024-05-08 '
 '20:22:40","updatedAt":"2024-10-04 '
 '19:17:14"},"credential":{"@id":"\\/credentials\\/9d2ac6b0-2454-4176-bce3-dbcc664f5c31","@type":"Credential","id":"9d2ac6b0-2454-4176-bce3-dbcc664f5c31","type":"ciec","rfc":"ACS921021HJ2","status":"valid","metadata":[],"extraction":null,"createdAt":"2024-10-04 '
 '19:34:36","updatedAt":"2024-10-04 19:34:48","name":"AGRITEC DEL '
 'CENTRO"},"tags":[],"sample":false,"onboardingId":"9d2ac062-729c-47b5-8617-d6ebf0f6152a","createdAt":"

In [3]:
print (SYNTAGE_API_KEY)

03519f3245652ddb6dfc87391b8fdca1


In [None]:
import json
import psycopg2d
from psycopg2 import sql


DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')

# Establish connection to PostgreSQL
def connect_db():
    conn = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        database=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD
    )
    return conn


def insert_into_entities(cursor, entity):
    insert_query = """
        INSERT INTO entities (
            id, type, name, added_by, owner, taxpayer_id, taxpayer_person_type, 
            taxpayer_registration_date, taxpayer_name, taxpayer_created_at, 
            taxpayer_updated_at, credential_id, credential_type, credential_rfc, 
            credential_status, credential_created_at, credential_updated_at, 
            tags, sample, onboarding_id, created_at, updated_at
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (id) DO NOTHING;
    """
    
    taxpayer = entity.get('taxpayer', {})
    credential = entity.get('credential', {})
    
    cursor.execute(insert_query, (
        entity.get('id'),
        entity.get('type'),
        entity.get('name'),
        json.dumps(entity.get('addedBy')) if entity.get('addedBy') else None,
        json.dumps(entity.get('owner')) if entity.get('owner') else None,
        taxpayer.get('id'),
        taxpayer.get('personType'),
        taxpayer.get('registrationDate'),
        taxpayer.get('name'),
        taxpayer.get('createdAt'),
        taxpayer.get('updatedAt'),
        credential.get('id'),
        credential.get('type'),
        credential.get('rfc'),
        credential.get('status'),
        credential.get('createdAt'),
        credential.get('updatedAt'),
        json.dumps(entity.get('tags', [])),
        entity.get('sample'),
        entity.get('onboardingId'),
        entity.get('createdAt'),
        entity.get('updatedAt')
    ))


# Insert data into the PostgreSQL table
def insert_into_entities(cursor, entity):
    insert_query = """
        INSERT INTO entities (
            id, type, name, added_by, owner, taxpayer_id, taxpayer_person_type, 
            taxpayer_registration_date, taxpayer_name, taxpayer_created_at, 
            taxpayer_updated_at, credential_id, credential_type, credential_rfc, 
            credential_status, credential_created_at, credential_updated_at, 
            tags, sample, onboarding_id, created_at, updated_at
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (id) DO NOTHING;
    """
    
    cursor.execute(insert_query, (
        entity.get('id'),
        entity.get('type'),
        entity.get('name'),
        json.dumps(entity.get('addedBy')),
        json.dumps(entity.get('owner')),
        entity['taxpayer']['id'],
        entity['taxpayer']['personType'],
        entity['taxpayer']['registrationDate'],
        entity['taxpayer']['name'],
        entity['taxpayer']['createdAt'],
        entity['taxpayer']['updatedAt'],
        entity['credential']['id'],
        entity['credential']['type'],
        entity['credential']['rfc'],
        entity['credential']['status'],
        entity['credential']['createdAt'],
        entity['credential']['updatedAt'],
        json.dumps(entity.get('tags', [])),
        entity.get('sample'),
        entity.get('onboardingId'),
        entity.get('createdAt'),
        entity.get('updatedAt')
    ))

# Function to parse JSON and insert data into the database
def parse_and_insert(json_data):
    # Establish a connection and start transaction
    conn = connect_db()
    cursor = conn.cursor()

    # Loop through each entity in the JSON
    for entity in json_data['hydra:member']:
        insert_into_entities(cursor, entity)
    
    # Commit and close
    conn.commit()
    cursor.close()
    conn.close()

# Example usage
if __name__ == "__main__":
    # Load your JSON data here
    json_str = '''{...}'''  # Replace this with your JSON string
    json_data = json.loads(json_str)
    
    # Parse and insert data into the database
    parse_and_insert(json_data)


In [4]:
import json
import psycopg2
from psycopg2 import sql

# Establish connection to PostgreSQL


def connect_db():
    conn = psycopg2.connect(
        host="blooms-analytics-db.c5e4ooiayxfd.us-east-1.rds.amazonaws.com",
        database="analytics",
        user="blooms_writer",
        password="XNS7TEMceDMQCXH9EnpYdo"
    )
    return conn


def insert_into_entities(cursor, entity):
    insert_query = """
        INSERT INTO entities (
            id, type, name, added_by, owner, taxpayer_id, taxpayer_person_type, 
            taxpayer_registration_date, taxpayer_name, taxpayer_created_at, 
            taxpayer_updated_at, credential_id, credential_type, credential_rfc, 
            credential_status, credential_created_at, credential_updated_at, 
            tags, sample, onboarding_id, created_at, updated_at
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (id) DO NOTHING;
    """

    taxpayer = entity.get('taxpayer', {})
    credential = entity.get('credential', {})

    cursor.execute(insert_query, (
        entity.get('id'),
        entity.get('type'),
        entity.get('name'),
        json.dumps(entity.get('addedBy')) if entity.get('addedBy') else None,
        json.dumps(entity.get('owner')) if entity.get('owner') else None,
        taxpayer.get('id'),
        taxpayer.get('personType'),
        taxpayer.get('registrationDate'),
        taxpayer.get('name'),
        taxpayer.get('createdAt'),
        taxpayer.get('updatedAt'),
        credential.get('id'),
        credential.get('type'),
        credential.get('rfc'),
        credential.get('status'),
        credential.get('createdAt'),
        credential.get('updatedAt'),
        json.dumps(entity.get('tags', [])),
        entity.get('sample'),
        entity.get('onboardingId'),
        entity.get('createdAt'),
        entity.get('updatedAt')
    ))



def insert_into_entities(cursor, entity):
    insert_query = """
        INSERT INTO entities (
            id, type, name, added_by, owner, taxpayer_id, taxpayer_person_type, 
            taxpayer_registration_date, taxpayer_name, taxpayer_created_at, 
            taxpayer_updated_at, credential_id, credential_type, credential_rfc, 
            credential_status, credential_created_at, credential_updated_at, 
            tags, sample, onboarding_id, created_at, updated_at
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (id) DO NOTHING;
    """

    taxpayer = entity.get('taxpayer', {})
    credential = entity.get('credential', {})

    cursor.execute(insert_query, (
        entity.get('id'),
        entity.get('type'),
        entity.get('name'),
        json.dumps(entity.get('addedBy')) if entity.get('addedBy') else None,
        json.dumps(entity.get('owner')) if entity.get('owner') else None,
        taxpayer.get('id'),
        taxpayer.get('personType'),
        taxpayer.get('registrationDate'),
        taxpayer.get('name'),
        taxpayer.get('createdAt'),
        taxpayer.get('updatedAt'),
        credential.get('id'),
        credential.get('type'),
        credential.get('rfc'),
        credential.get('status'),
        credential.get('createdAt'),
        credential.get('updatedAt'),
        json.dumps(entity.get('tags', [])),
        entity.get('sample'),
        entity.get('onboardingId'),
        entity.get('createdAt'),
        entity.get('updatedAt')
    ))

# Function to parse JSON and insert data into the database
def parse_and_insert(json_data):
    # Establish a connection and start transaction
    conn = connect_db()
    cursor = conn.cursor()

    # Loop through each entity in the JSON
    for entity in json_data['hydra:member']:
        insert_into_entities(cursor, entity)

    # Commit and close
    conn.commit()
    cursor.close()
    conn.close()


""" 
# Example usage
if __name__ == "__main__":
    # Load your JSON data here
    json_str = '''{...}'''  # Replace this with your JSON string
    json_data = json.loads(json_str)

    # Parse and insert data into the database
    parse_and_insert(json_data) """

' \n# Example usage\nif __name__ == "__main__":\n    # Load your JSON data here\n    json_str = \'\'\'{...}\'\'\'  # Replace this with your JSON string\n    json_data = json.loads(json_str)\n\n    # Parse and insert data into the database\n    parse_and_insert(json_data) '

In [5]:
json_data = json.loads(data)

# Parse and insert data into the database
parse_and_insert(json_data) 

TypeError: not all arguments converted during string formatting

### Code to parse json of entities

In [6]:
import json

def parse_json_to_dict(data):
    parsed_data = []

    # Iterate through each 'hydra:member' (company entry)
    for entity in data.get("hydra:member", []):
        company_dict = {
            "id": entity.get("id", None),
            "type": entity.get("type", None),
            "name": entity.get("name", None),
            "addedBy": {
                "id": entity.get("addedBy", {}).get("id", None),
                "firstName": entity.get("addedBy", {}).get("firstName", None),
                "lastName": entity.get("addedBy", {}).get("lastName", None),
                "email": entity.get("addedBy", {}).get("email", None),
            } if entity.get("addedBy") else None,
            "owner": {
                "id": entity.get("owner", {}).get("id", None),
                "firstName": entity.get("owner", {}).get("firstName", None),
                "lastName": entity.get("owner", {}).get("lastName", None),
                "email": entity.get("owner", {}).get("email", None),
            } if entity.get("owner") else None,
            "taxpayer": {
                "id": entity.get("taxpayer", {}).get("id", None),
                "personType": entity.get("taxpayer", {}).get("personType", None),
                "registrationDate": entity.get("taxpayer", {}).get("registrationDate", None),
                "name": entity.get("taxpayer", {}).get("name", None),
                "createdAt": entity.get("taxpayer", {}).get("createdAt", None),
                "updatedAt": entity.get("taxpayer", {}).get("updatedAt", None),
            } if entity.get("taxpayer") else None,
            "credential": {
                "id": entity.get("credential", {}).get("id", None),
                "type": entity.get("credential", {}).get("type", None),
                "rfc": entity.get("credential", {}).get("rfc", None),
                "status": entity.get("credential", {}).get("status", None),
                "createdAt": entity.get("credential", {}).get("createdAt", None),
                "updatedAt": entity.get("credential", {}).get("updatedAt", None),
            } if entity.get("credential") else None,
            "onboardingId": entity.get("onboardingId", None),
            "createdAt": entity.get("createdAt", None),
            "updatedAt": entity.get("updatedAt", None),
            "sample": entity.get("sample", None),
            "tags": [tag.get("name", None) for tag in entity.get("tags", [])]
        }
        parsed_data.append(company_dict)

    return parsed_data

# Example usage:
# Assuming your JSON is stored as a string in the variable 'json_string'
# data = json.loads(json_string)
# result = parse_json_to_dict(data)


In [7]:
dd = parse_json_to_dict(json.loads(data))

In [8]:
dd

[{'id': '9d2ac062-742c-4942-b7a7-8e9dbf5ca9cd',
  'type': 'company',
  'name': 'AGRITEC DEL CENTRO',
  'addedBy': None,
  'owner': None,
  'taxpayer': {'id': 'ACS921021HJ2',
   'personType': 'legal',
   'registrationDate': '1992-10-21 00:00:00',
   'name': 'AGRITEC DEL CENTRO',
   'createdAt': '2024-05-08 20:22:40',
   'updatedAt': '2024-10-04 19:17:14'},
  'credential': {'id': '9d2ac6b0-2454-4176-bce3-dbcc664f5c31',
   'type': 'ciec',
   'rfc': 'ACS921021HJ2',
   'status': 'valid',
   'createdAt': '2024-10-04 19:34:36',
   'updatedAt': '2024-10-04 19:34:48'},
  'onboardingId': '9d2ac062-729c-47b5-8617-d6ebf0f6152a',
  'createdAt': '2024-10-04 19:16:58',
  'updatedAt': '2024-10-04 19:34:49',
  'sample': False,
  'tags': []},
 {'id': '9d149df8-e479-4fef-b2c8-f8a9eb7ab511',
  'type': 'company',
  'name': 'SOCIEDAD DE PRODUCCION AGRICOLA MONTICAL',
  'addedBy': None,
  'owner': None,
  'taxpayer': {'id': 'PAM060927VD1',
   'personType': 'legal',
   'registrationDate': '2006-09-27 00:00:00

### Code to insert data into postgres

In [9]:
import psycopg2
from psycopg2.extras import execute_values

def insert_data_into_postgres(parsed_data, conn_params):
    """
    Inserts parsed data into the companies table in the Postgres database.

    Parameters:
    parsed_data (list): List of dictionaries, each representing a company.
    conn_params (dict): Dictionary containing Postgres connection parameters.
        Expected keys: dbname, user, password, host, port.
    """
    # Establishing the connection
    conn = psycopg2.connect(**conn_params)
    cursor = conn.cursor()

    # SQL insert query (bulk insertion using execute_values for efficiency)
    insert_query = """
    INSERT INTO entities (
        id, type, name, added_by_id, added_by_first_name, added_by_last_name, 
        added_by_email, owner_id, owner_first_name, owner_last_name, owner_email, 
        taxpayer_id, taxpayer_person_type, taxpayer_registration_date, 
        taxpayer_name, taxpayer_created_at, taxpayer_updated_at, 
        credential_id, credential_type, credential_rfc, credential_status, 
        credential_created_at, credential_updated_at, onboarding_id, 
        created_at, updated_at, sample, tags
    ) VALUES %s
    ON CONFLICT (id) DO NOTHING;  -- This prevents insertion conflicts on duplicate IDs
    """

    # Prepare the data to be inserted
    values = [
        (
            company.get("id"), 
            company.get("type"),
            company.get("name"),
            company.get("addedBy", {}).get("id") if company.get("addedBy") else None,
            company.get("addedBy", {}).get("firstName") if company.get("addedBy") else None,
            company.get("addedBy", {}).get("lastName") if company.get("addedBy") else None,
            company.get("addedBy", {}).get("email") if company.get("addedBy") else None,
            company.get("owner", {}).get("id") if company.get("owner") else None,
            company.get("owner", {}).get("firstName") if company.get("owner") else None,
            company.get("owner", {}).get("lastName") if company.get("owner") else None,
            company.get("owner", {}).get("email") if company.get("owner") else None,
            company.get("taxpayer", {}).get("id") if company.get("taxpayer") else None,
            company.get("taxpayer", {}).get("personType") if company.get("taxpayer") else None,
            company.get("taxpayer", {}).get("registrationDate") if company.get("taxpayer") else None,
            company.get("taxpayer", {}).get("name") if company.get("taxpayer") else None,
            company.get("taxpayer", {}).get("createdAt") if company.get("taxpayer") else None,
            company.get("taxpayer", {}).get("updatedAt") if company.get("taxpayer") else None,
            company.get("credential", {}).get("id") if company.get("credential") else None,
            company.get("credential", {}).get("type") if company.get("credential") else None,
            company.get("credential", {}).get("rfc") if company.get("credential") else None,
            company.get("credential", {}).get("status") if company.get("credential") else None,
            company.get("credential", {}).get("createdAt") if company.get("credential") else None,
            company.get("credential", {}).get("updatedAt") if company.get("credential") else None,
            company.get("onboardingId"),
            company.get("createdAt"),
            company.get("updatedAt"),
            company.get("sample"),
            company.get("tags")
        ) 
        for company in parsed_data
    ]

    # Perform bulk insertion using execute_values for efficiency
    try:
        execute_values(cursor, insert_query, values)
        conn.commit()
        print(f"Inserted {cursor.rowcount} rows successfully.")
    except Exception as e:
        print(f"Error during insertion: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

# Example usage:
# parsed_data = parse_json_to_dict(data)  # Assuming 'data' is the JSON data.
# conn_params = {
#     "dbname": "your_dbname",
#     "user": "your_user",
#     "password": "your_password",
#     "host": "your_host",
#     "port": "your_port"
# }
# insert_data_into_postgres(parsed_data, conn_params)
