### Test environment, Create a folder to store result

In [11]:
import os, pathlib
from dotenv import load_dotenv

pathlib.Path("Out").mkdir(exist_ok=True)
print("Out/ ready.")

load_dotenv()
print("Env loaded. API_KEY present? ", bool(os.getenv("API_KEY")))
print("Kernel OK.")

Out/ ready.
Env loaded. API_KEY present?  True
Kernel OK.


### Test API Key and Search Engine ID

In [12]:
API_KEY = os.getenv("API_KEY")
CX_ID = os.getenv("CX_ID")
GOOGLE_API = 'https://www.googleapis.com/customsearch/v1'
assert API_KEY and CX_ID, "Set API_KEY and CX_ID in your .env file"
print("OK, keys loaded.")

OK, keys loaded.


### Setup Queries and Pages

In [13]:
import json


PAGES = os.getenv("PAGES")
FOLLOWER_MIN=os.getenv("FOLLOWER_MIN")
SITES = pathlib.Path("queries") / "sites.json"
CITIES = pathlib.Path("queries") / "cities.json"
VIEWS = pathlib.Path("queries") / "views.json"
PATTERNS = pathlib.Path("queries") / "patterns.json"
KEYWORDS = pathlib.Path("queries") / "keywords.json"

def create_queries() -> dict[str, list[str]]:
    with open(SITES) as sites_file:
        sites = json.load(sites_file)
    with open(CITIES) as cities_file:
        cities = json.load(cities_file)
    with open(KEYWORDS) as keywords_file:
        keywords = json.load(keywords_file)

    q = dict()
    for site in sites["social_media"]:
        # modify this part to get different filter combos
        q[site] = [f"{site} {keyword} {city}" for keyword in keywords["nail"] for city in cities["top_major_cities"]]

    return q

QUERY = create_queries()

### Parse url and gather data with patterns and queries

In [14]:
import re
import requests
from urllib.parse import urlparse

def customized_search(query: str, start: int = 1):
    r = requests.get(GOOGLE_API, params={
        "key": API_KEY,
        "cx": CX_ID,
        "q": query,
        "start": start
    }, timeout=30)
    r.raise_for_status()
    return r.json()

def gather_raw_data(query: dict[str, list[str]]) -> dict[str, list[str]]:
    raw_data = dict()
    for site, fil in query.items():
        raw_data[site] = customized_search(fil).get("items", [])
    return raw_data

In [15]:
def parse_ig_url(url: str) -> tuple[str, str]:
    """
    Given a URL, return its username.
    :param url:
    :return: account web address
    """
    p = urlparse(url)
    url_segments = p.path.split("/")
    if url_segments[0].lower() not in {"p", "reels", "stories", "explore", "tv", "channels", "direct"}:
        return url, "a"
    elif url_segments[0].lower() == "p":
        return url, "p"
    return "none", "none"


def parse_nums(m):
    if not m:
        return 0

    num_str, suffix = m.groups()
    num = float(num_str.replace(',', ''))

    mult = {
        None: 1,
        '': 1,
        'K': 1_000,
        'M': 1_000_000,
        'B': 1_000_000_000
    }
    return int(num * mult.get(suffix.upper() if suffix else '', 1))


def parse_ig_followers(raw):
    pattern = '(\\d[\\d,.]*)\\s*(K|M|B)?\\s+followers'
    m = re.search(pattern, raw, re.I)
    return parse_nums(m)


def parse_ig_posts(raw):
    pattern = '(\\d[\\d,.]*)\\s*(K|M|B)?\\s+posts'
    m = re.search(pattern, raw, re.I)
    return parse_nums(m)

def parse_ig_following(raw):
    pattern = '(\\d[\\d,.]*)\\s*(K|M|B)?\\s+following'
    m = re.search(pattern, raw, re.I)
    return parse_nums(m)

def trim_after_posts(raw: str) -> str:
    if not raw:
        return ""

    # Normalize common invisible spaces
    s = raw.replace("\xa0", " ")

    # Find 'posts' (or 'post') as a whole word
    m = re.search(r'\bposts?\b', s, flags=re.IGNORECASE)
    if not m:
        return s.strip()

    # Everything after the word 'posts'
    tail = s[m.end():]

    # Strip leading punctuation/delimiters often seen after 'Posts'
    # (space, dots, hyphen, en/em dash, colon, bullets, pipes)
    tail = re.sub(r'^[\s\.\-–—:•·|]+', '', tail)

    return tail.strip()

def clean_instagram_title(title: str) -> str:
    """
    Strip trailing '• Instagram …' (any suffix starting with a separator + 'Instagram')
    from Google CSE titles, even if truncated with … or '...'.
    """
    if not title:
        return ""

    t = title.replace("\xa0", " ").strip()
    t = re.sub(r"\s+", " ", t)  # normalize whitespace

    # Remove: [separator] Instagram ... [to end]
    # separators seen: • · - – — | (&bull; sometimes appears)
    t = re.sub(
        r'\s*(?:[•·\-\–\—|]|&bull;)\s*Instagram\b.*$',
        '',
        t,
        flags=re.IGNORECASE
    )

    # Also handle rare '(Instagram)' suffixes
    t = re.sub(r'\s*\(\s*Instagram\s*\)\s*$', '', t, flags=re.IGNORECASE)

    # Trim dangling punctuation/spaces
    t = t.strip(' -–—|·•').strip()
    return t


FOLLOWER_THRESHOLD = 2000


def parse_and_filter_ig_data(raw_data):
    result = dict()

    for value in raw_data['instagram.com']:
        d = dict()
        raw_title = clean_instagram_title(value['title'])

        raw_url = value['link']
        raw_snippet = value['snippet']

        parsed_url = parse_ig_url(raw_url)
        if parsed_url[1] == 'p':
            # result is a post, temporarily skip this result
            continue
        elif parsed_url[1] == 'a':
            # result is an account
            follower = parse_ig_followers(raw_snippet)
            if follower < FOLLOWER_THRESHOLD:
                # we don't care followers < 2k accounts
                continue
            else:
                following = parse_ig_following(raw_snippet)
                post = parse_ig_posts(raw_snippet)
                description = trim_after_posts(raw_snippet)
        else:
            # dont handle this case for now
            continue
        d['type'] = 'a'
        d['url'] = parsed_url[0]
        d['follower'] = follower
        d['following'] = following
        d['post'] = post
        d['description'] = description
        result[raw_title] = d
    return result

In [16]:
import pandas as pd


# df = pd.DataFrame.from_dict(data, orient="index")

In [17]:
import csv

def save_profiles_to_csv(data: dict, path: str = "instagram_profiles.csv"):
    """
    Save your {title: {...fields...}} dict to a CSV.
    """
    # Desired column order (others will be appended if present)
    base_fields = ["title", "type", "url", "follower", "following", "post", "description"]

    # Build rows and collect any extra keys that appear
    rows, extra_keys = [], set()
    for title, info in data.items():
        row = {"title": title}
        row.update(info or {})
        rows.append(row)
        extra_keys.update(k for k in row.keys() if k not in base_fields)

    # Final field order
    fieldnames = base_fields + sorted(extra_keys)

    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for r in rows:
            # ensure all fields exist (avoid KeyError)
            for k in fieldnames:
                r.setdefault(k, "")
            w.writerow(r)

# Example:
# save_profiles_to_csv(data, "profiles.csv")

In [None]:
print("Hello World!")
