In [1]:
import psycopg2
import psycopg2.extras
import requests
import logging
import time
from ratelimit import limits, sleep_and_retry
from urllib.parse import quote
import os
from dotenv import load_dotenv
import json


In [2]:
load_dotenv()
API_KEY ='QqptYKkMpzTBN5q266dKpA'

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='./apollo_sync.log'
)

In [3]:
# Database Insertion Function
def insert_data(data_tuples):
    try:
        with psycopg2.connect(
            host = 'whitewolfden.myddns.me',
            database = 'abhinav-dev',
            user = 'kingslayer',
            password = '4Khc5D8+6T.3',
            port = '6204'
        ) as conn:
            
            with conn.cursor() as cur:
                insert_query = """
                INSERT INTO solar_contacts_without_salesforce_founder
                (id, first_name, last_name, linkedin, title, email, email_status, id_tracker,
                website, company_linkedin, work_history)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::JSONB[])
                ON CONFLICT (id) DO UPDATE SET
                first_name = EXCLUDED.first_name,
                last_name = EXCLUDED.last_name,
                linkedin = EXCLUDED.linkedin,
                title = EXCLUDED.title,
                email = EXCLUDED.email,
                email_status = EXCLUDED.email_status,
                id_tracker = EXCLUDED.id_tracker,
                website = EXCLUDED.website,
                company_linkedin = EXCLUDED.company_linkedin,
                work_history = EXCLUDED.work_history;
                """

                formatted_data = [
                    (*t[:-1], [json.dumps(r) for r in t[-1]])
                    for t in data_tuples
                ]

                psycopg2.extras.execute_batch(cur, insert_query, formatted_data)
                conn.commit()

    except Exception as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

In [None]:
# Update the status 
def status_updater():
    with psycopg2.connect(
            host = 'whitewolfden.myddns.me',
            database = 'abhinav-dev',
            user = 'kingslayer',
            password = '4Khc5D8+6T.3',
            port = '6204'
        ) as conn:
         with conn.cursor() as cur:
             update_query=(f"UPDATE solar_contacts_without_salesforce SET completed = TRUE WHERE website = ?",(website,))
    conn.close()
             
    

In [4]:
# Apollo API Calls

@sleep_and_retry
@limits(calls=150, period=60)
def people_search_api(company_url, current_page):

    base_url = "https://api.apollo.io/api/v1/mixed_people/search"

    default_params = {
        'q_organization_domains': company_url,
        'page': current_page,
        'per_page': 100
    }

    # Convert params to URL-encoded string
    query_string = '&'.join(f"{quote(str(k))}={quote(str(v))}" for k, v in default_params.items())
    url = f"{base_url}?{query_string}"

    headers = {
        "accept": "application/json",
        "Cache-Control": "no-cache",
        "Content-Type": "application/json",
        "x-api-key": API_KEY
    }

    try:
        response = requests.post(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        logging.info(f'API request successful for {company_url} page {current_page}')
        return data
    except requests.exceptions.RequestException as e:
        print(e)
        return None

In [5]:
# Extract Data From Response From Contact
def extract_data_contact(response, contact_variable):
    data_tuples = []
    try:
        for i in range(len(response[contact_variable])):
            id = response[contact_variable][i]['id']
            first_name = response[contact_variable][i]['first_name']
            last_name = response[contact_variable][i]['last_name']
            linkedIn = response[contact_variable][i]['linkedin_url']
            title = response[contact_variable][i]['title']
            email = response[contact_variable][i]['email']
            email_status = response[contact_variable][i]['email_status']
            id_tracker = 'contact'
            website = response[contact_variable][i]['account']['website_url']
            company_linkedin = response[contact_variable][i]['account']['linkedin_url']
    
        records = []
        for j in range(len(response[contact_variable][i]['employment_history'])):
            record = {
                'company_name': response[contact_variable][i]['employment_history'][j]['organization_name'],
                'org_id': response[contact_variable][i]['employment_history'][j]['_id'],
                'title': response[contact_variable][i]['employment_history'][j]['title'],
                'start_date': response[contact_variable][i]['employment_history'][j]['start_date'],
                'end_date': response[contact_variable][i]['employment_history'][j]['end_date'],
            }
            records.append(record)
        thistuple = (id, first_name, last_name, linkedIn, title, email, email_status, id_tracker, website, company_linkedin, records)
        data_tuples.append(thistuple)
        if contact:
            status_updater(website)
    except Exception as error:
        print(error)
    
    return data_tuples

In [6]:
# Extract Data From Response From People
def extract_data_people(response, contact_variable):
    data_tuples = []
    try:
        for i in range(len(response[contact_variable])):
            id = response[contact_variable][i]['id']
            first_name = response[contact_variable][i]['first_name']
            last_name = response[contact_variable][i]['last_name']
            linkedIn = response[contact_variable][i]['linkedin_url']
            title = response[contact_variable][i]['title']
            email = response[contact_variable][i]['email']
            email_status = ''
            id_tracker = 'people'
            website = response[contact_variable][i]['account']['website_url']
            company_linkedin = response[contact_variable][i]['account']['linkedin_url']
    
        records = []
        for j in range(len(response[contact_variable][i]['employment_history'])):
            record = {
                'company_name': response[contact_variable][i]['employment_history'][j]['organization_name'],
                'org_id': response[contact_variable][i]['employment_history'][j]['_id'],
                'title': response[contact_variable][i]['employment_history'][j]['title'],
                'start_date': response[contact_variable][i]['employment_history'][j]['start_date'],
                'end_date': response[contact_variable][i]['employment_history'][j]['end_date'],
            }
            records.append(record)
        thistuple = (id, first_name, last_name, linkedIn, title, email, email_status, id_tracker, website, company_linkedin, records)
        data_tuples.append(thistuple)
        if people:
            status_updater(website)
    except Exception as error:
        print(error)
    
    return data_tuples

In [7]:
# Operating Apollo API
def calling_api(company_url, current_page):
    response = people_search_api(company_url, current_page)
    if response:
        current_page = int(response['pagination']['page'])
        total_pages = response['pagination']['total_pages']

    if response['contacts']:
        data_tuple = extract_data_contact(response, 'contacts')
        insert_data(data_tuple)
        

    if response['people']:
        data_tuple = extract_data_people(response, 'people')
        insert_data(data_tuple)

    if (total_pages > 1) and (current_page < total_pages): 
        calling_api(company_url, current_page + 1)

In [8]:
# Getting the List of URL from Database
def get_urls():
    conn = None
    try:
        with psycopg2.connect(
            host = 'whitewolfden.myddns.me',
            database = 'abhinav-dev',
            user = 'kingslayer',
            password = '4Khc5D8+6T.3',
            port = '6204'
        ) as conn:
            
            with conn.cursor() as cur:
                cur.execute("SELECT website FROM solar_companies_without_salesforce ORDER BY website ASC")
                table_content = cur.fetchall()
                table_content = [row[0] for row in table_content]
        
        return table_content
    except Exception as error:
        print(error)
        return[]
    finally:
        if conn is not None:
            conn.close()

In [9]:
# Main Function
def main():
    urls = get_urls()
    if urls is not None and len(urls) > 0:
        try:
            for url in urls:
                print(url)
                calling_api(url, 1)
                logging.info(f'Completed processing ...')
        except Exception as e:
            # logging.error(f'Main process error: {e}')
            print(e)

if __name__ == "__main__":
    main()

http://www.11millionacres.com
http://www.127energy.com
http://www.1876energy.com
http://www.1solar.com
http://www.1sourcesolar.com
http://www.1starenergy.com
http://www.1stcoastrecycling.net
http://www.2etc.com
http://www.350bayarea.org
http://www.3bsforestry.com
http://www.3xm.biz
http://www.400brand.info
http://www.4ctechnologies.com
http://www.4ocean.com
http://www.512solar.com
http://www.603solar.com
http://www.93energy.com
http://www.9flagssolar.com
http://www.a1energy.net
http://www.a1organics.com
http://www.aawems.com
http://www.abound.com
http://www.aboutsavingheat.com
http://www.abpelectrical.com
http://www.absolutesolar.com
'account'
http://www.acba.africa
http://www.accordpower.com
http://www.acdcsolar.com
http://www.acelfil.com
http://www.acespace.org
http://www.achievesolarflorida.com
http://www.acrcd.org
http://www.acrsolar.com
http://www.actgroup.com.pk
http://www.actionsolar.com
http://www.activepowersolutions.com
http://www.activesurfaces.xyz
http://www.acuitypower.com