In [1]:
import cloudflare
import re
import boto3
import pandas as pd
import os

def get_load_balancer_details():
    elbv2_client = boto3.client('elbv2')
    ec2_client = boto3.client('ec2')

    # Get all VPCs with pagination
    vpcs = []
    vpc_paginator = ec2_client.get_paginator('describe_vpcs')
    for page in vpc_paginator.paginate():
        vpcs.extend(page['Vpcs'])

    vpc_dict = {vpc['VpcId']: vpc.get('Tags', [{'Key': 'Name', 'Value': 'N/A'}])[0]['Value'] for vpc in vpcs}

    results = []
    marker = None

    while True:
        # Get list of load balancers with pagination
        if marker:
            load_balancers = elbv2_client.describe_load_balancers(Marker=marker)
        else:
            load_balancers = elbv2_client.describe_load_balancers()

        for lb in load_balancers['LoadBalancers']:
            lb_arn = lb['LoadBalancerArn']
            lb_name = lb['LoadBalancerName']

            # Check if security group is attached
            has_security_group = len(lb.get('SecurityGroups', [])) > 0

            # Get load balancer type
            lb_type = lb['Type']

            # Check if the load balancer is public
            is_public = lb['Scheme'] == 'internet-facing'

            # Get VPC ID and Name
            vpc_id = lb['VpcId']
            vpc_name = vpc_dict.get(vpc_id, 'Unknown')

            # Get tags for the load balancer
            tags = elbv2_client.describe_tags(ResourceArns=[lb_arn])['TagDescriptions'][0]['Tags']
            
            # Create a dictionary of tags
            tag_dict = {tag['Key']: tag['Value'] for tag in tags}

            results.append({
                'Load Balancer Name': lb_name,
                'Load Balancer ARN': lb_arn,
                'Has Security Group': 'Yes' if has_security_group else 'No',
                'Load Balancer Type': lb_type,
                'Is Public': 'Yes' if is_public else 'No',
                'VPC ID': vpc_id,
                'VPC Name': vpc_name,
                'DNS Name': lb['DNSName'],
                'Tags': tag_dict
            })

        if 'NextMarker' in load_balancers:
            marker = load_balancers['NextMarker']
        else:
            break

    return results

key = os.environ.get('CLOUDFLARE_API_TOKEN')
print(key)

None


In [2]:
# Run the function to get load balancer details
lb_details = get_load_balancer_details()

# Create a pandas DataFrame
lbdf = pd.DataFrame(lb_details)
# Display the DataFrame in the notebook
lbdf

In [3]:
# Export to CSV
prefix="staging-cs"

In [4]:

csv_filename = f'{prefix}-load_balancer_config.csv'
lbdf.to_csv(csv_filename, index=False)

print(f"Load Balancer configuration details exported to {csv_filename}")



Load Balancer configuration details exported to staging-cs-load_balancer_config.csv


In [5]:
def get_api_gateway_details():
    api_client = boto3.client('apigateway')
    waf_client = boto3.client('wafv2')

    results = []
    custom_domains = {}

    # Get all custom domain names
    paginator = api_client.get_paginator('get_domain_names')
    for page in paginator.paginate():
        for domain in page['items']:
            domain_name = domain['domainName']
            custom_domains[domain_name] = {
                'certificate_arn': domain.get('certificateArn', 'N/A'),
                'regional_domain_name': domain.get('regionalDomainName', 'N/A'),
                'regional_hosted_zone_id': domain.get('regionalHostedZoneId', 'N/A'),
                'mappings': []
            }

    # Get base path mappings for each custom domain
    for domain_name in custom_domains:
        try:
            mappings = api_client.get_base_path_mappings(domainName=domain_name)
            for mapping in mappings.get('items', []):
                custom_domains[domain_name]['mappings'].append({
                    'base_path': mapping.get('basePath', '(none)'),
                    'api_id': mapping['restApiId'],
                    'stage': mapping['stage']
                })
        except api_client.exceptions.NotFoundException:
            # No mappings found for this domain
            pass

    # Get list of all API Gateways
    paginator = api_client.get_paginator('get_rest_apis')
    for page in paginator.paginate():
        for api in page['items']:
            api_id = api['id']
            api_name = api['name']
            print(api_name)

            # Get API stages
            stages = api_client.get_stages(restApiId=api_id)['item']

            for stage in stages:
                stage_name = stage['stageName']
                print(stage_name)

                # Check for authorizer configurations
                authorizers = api_client.get_authorizers(restApiId=api_id)['items']
                has_authorizer = len(authorizers) > 0

                # Check for WAF Web ACL
                waf_acl = None
                try:
                    waf_acl = api_client.get_stage(restApiId=api_id, stageName=stage_name)['webAclArn']
                except KeyError:
                    pass

                # Get API Gateway type
                api_type = api.get('endpointConfiguration', {}).get('types', ['Unknown'])[0]

                # Get the invoke URL
                invoke_url = f"https://{api_id}.execute-api.{api_client.meta.region_name}.amazonaws.com/{stage_name}"

                # Find associated custom domains
                associated_domains = []
                for domain, details in custom_domains.items():
                    for mapping in details['mappings']:
                        if mapping['api_id'] == api_id and mapping['stage'] == stage_name:
                            associated_domains.append({
                                'domain_name': domain,
                                'base_path': mapping['base_path'],
                                'regional_domain_name': details['regional_domain_name']
                            })

                results.append({
                    'API Name': api_name,
                    'API ID': api_id,
                    'Stage': stage_name,
                    'Has Authorizer': has_authorizer,
                    'WAF Web ACL': 'Yes' if waf_acl else 'No',
                    'API Gateway Type': api_type,
                    'Invoke URL': invoke_url,
                    'Associated Custom Domains': associated_domains
                })

    return results

# Run the function to get API Gateway details
api_gateway_details = get_api_gateway_details()

# Create a pandas DataFrame
apidf = pd.DataFrame(api_gateway_details)

# Display the DataFrame in the notebook
apidf

mso-connect-dynamics-api
v0
dynamicsUpdateConnectAttributes
test


Unnamed: 0,API Name,API ID,Stage,Has Authorizer,WAF Web ACL,API Gateway Type,Invoke URL,Associated Custom Domains
0,mso-connect-dynamics-api,na7g8rxpk4,v0,False,No,EDGE,https://na7g8rxpk4.execute-api.us-east-1.amazo...,[]
1,dynamicsUpdateConnectAttributes,rvxwcopyx9,test,False,No,REGIONAL,https://rvxwcopyx9.execute-api.us-east-1.amazo...,[]


In [6]:
# Export to CSV
csv_filename = f'{prefix}-api_gateway_config.csv'
apidf.to_csv(csv_filename, index=False)

print(f"API Gateway configuration details exported to {csv_filename}")

API Gateway configuration details exported to staging-cs-api_gateway_config.csv


In [7]:
import pandas as pd
import re

from cloudflare import Cloudflare
from typing import List, Dict, Any

def get_matching_records(zone_ids: List[str], api_gateway_df: pd.DataFrame, load_balancer_df: pd.DataFrame) -> List[Dict[str, Any]]:
    # Initialize Cloudflare client
    cf = Cloudflare(api_token='')

    # Prepare sets of DNS names to match
    api_gateway_dns_names = set()
    custom_domain_names = set()
    load_balancer_dns_names = set()

    # Process API Gateway data if available
    if not api_gateway_df.empty and 'Invoke URL' in api_gateway_df.columns:
        api_gateway_dns_names = set(api_gateway_df['Invoke URL'].apply(lambda x: re.sub(r'^https?://', '', x)))
        
        if 'Associated Custom Domains' in api_gateway_df.columns:
            for domains in api_gateway_df['Associated Custom Domains']:
                if isinstance(domains, list):
                    for domain in domains:
                        custom_domain_names.add(domain['domain_name'])
                        custom_domain_names.add(domain['regional_domain_name'])

    # Process Load Balancer data if available
    if not load_balancer_df.empty and 'DNS Name' in load_balancer_df.columns:
        load_balancer_dns_names = set(load_balancer_df['DNS Name'])

    all_dns_names = api_gateway_dns_names.union(load_balancer_dns_names).union(custom_domain_names)

    results = []

    for zone_id in zone_ids:
        try:
            # Get zone details
            zone = cf.zones.get(zone_id=zone_id)
            zone_name = zone.name if hasattr(zone, 'name') else "Unknown"

            # Get all DNS records for the zone using auto-pagination
            for record in cf.dns.records.list(zone_id=zone_id):
                content = record.content if hasattr(record, 'content') else str(record.get('content', ''))
                name = record.name if hasattr(record, 'name') else str(record.get('name', ''))

                matched_resource = None
                if content in all_dns_names or name in all_dns_names:
                    if content in api_gateway_dns_names or name in api_gateway_dns_names:
                        matched_resource = 'API Gateway'
                    elif content in custom_domain_names or name in custom_domain_names:
                        matched_resource = 'API Gateway (Custom Domain)'
                    elif content in load_balancer_dns_names or name in load_balancer_dns_names:
                        matched_resource = 'Load Balancer'

                if matched_resource:
                    results.append({
                        'Zone ID': zone_id,
                        'Zone Name': zone_name,
                        'Record Type': record.type if hasattr(record, 'type') else record.get('type', 'Unknown'),
                        'Record Name': name,
                        'Record Content': content,
                        'Matched Resource': matched_resource
                    })

        except Exception as e:
            print(f"Error processing zone {zone_id}: {e}")

    return results


In [8]:
# List of Cloudflare zones to check
zones_to_check = ['6c4ab3b69dcfa4bb3d6a7dda25146ba3', '8fd9427a3d318cf4db0e499eba4b78c8']  # Replace with your actual zone names

# Get matching records
matching_records = get_matching_records(zones_to_check, apidf, lbdf)

# Create a DataFrame from the results
results_df = pd.DataFrame(matching_records)

# Display the DataFrame in the notebook
results_df

In [9]:
# Load DataFrames from CSV files
#api_gateway_df = pd.read_csv('api_gateway_config.csv')
#load_balancer_df = pd.read_csv('load_balancer_config.csv')

# Export to CSV
csv_filename = f'{prefix}-cf_recs.csv'
results_df.to_csv(csv_filename, index=False)

print(f"Matching Cloudflare DNS records exported to {csv_filename}")


Matching Cloudflare DNS records exported to staging-cs-cf_recs.csv


In [None]:
# Export to CSV
csv_filename = 'cloudflare_matching_records.csv'
results_df.to_csv(csv_filename, index=False)

print(f"Matching Cloudflare DNS records exported to {csv_filename}")

# Load DataFrames from CSV files
api_gateway_df = pd.read_csv('api_gateway_config.csv')
load_balancer_df = pd.read_csv('load_balancer_config.csv')