# SERP Level Proportions

In [1]:
import pandas as pd
import csv
from collections import Counter
import os
import json
from datetime import datetime

Overview:
1. Label every result on each serp as non-gov, local, county, state, federal, sen/rep
2. Calculate how many govt/govt maintained results appear per serp
3. Calculate how many of the local, county, state, sen/rep are correct to the location.

Difference between 2 and 3 is error in localization/targeting of election information.

In [6]:
def domainLocations(path):
    """Creates a dictionary of domain location data, where keys are (clean) domains 
    and values are dictionary of location information.
    path: csv file of domain location information
    
    THIS WILL ONLY INCLUDE GOVT/GOVT MAINTAINED DOMAINS IN THIS CASE.
    """
    info = {}
    with open(path, "r") as f:
        r = csv.DictReader(f)
        for row in r:
            dom = row['domain']
            place = row['place'] if row['place'] != "" else None
            county = row['county'] if row['county'] != "" else None
            state = row['state'] if row['state'] != "" else None
            country = row['country'] if row['country'] != "" else None
            district = row['district'] if row['district'] != "" else None
            multi_state = [state.strip() for state in row['multi_state'].split(",")] if row['multi_state'] != "" else None
            govt_type = row['type'] if row['type'] != "" else None
            dct = {'place':place, 'county':county, 'state':state, 'country':country,
                   'district':district, 'multi_state':multi_state, 'type':govt_type}
            info[dom] = dct
    return info

In [7]:
# abbrevation to state dictionary
with open("/Users/brook/OneDrive/Desktop/Thesis/abbr_to_state.json", "r") as f:
    returnState = json.load(f)

In [8]:
def searchLocations(path):
    """
    Creates a dictionary of search location data formatted properly.
    path: csv file of search location information
    """
    data = {}
    with open(path, "r") as f:
        r = csv.DictReader(f)
        for row in r:
            loc = ",".join([row['place'].split("-")[0].strip(), row['state']]) # info we can get from SERP
            data[loc] = row
    return data

In [9]:
def readData(filename, subset=False, queries=None):
    """
    Reads in a CSV of parsed organic results. Converts to dictionary of appropriate structure.
    Optionally filters data to only include certain queries (eg. for testing)
    CSVs look like this:
    date	state	location	query	domain	link	title	rank	position	clean_domain
    10/24/2022	AK	New Stuyahok	2022 ballot	https://ballotpedia.org	https://ballotpedia.org/2022_ballot_measures	2022 ballot measures - Ballotpedia	1	1	ballotpedia.org
    """
    with open(filename, "r", encoding='utf-8') as f:
        data = []  # list of dictionaries, where each dictionary contains info for one SERP
        r = csv.reader(f)
        next(r) # header
        current_serp = ""
        current_place = ""
        current_state = ""
        current_date = ""
        current_query = ""
        current_doms = []
        for row in r: 
            state = returnState[row[1]]
            place = row[2]
            date = row[0].replace("/", "-")
            query = row[3]
            domain = row[-1]
            serp_info = "_".join([date,state,place,query]) 
            if current_serp == "":  # first serp
                current_serp = serp_info
                current_place = place
                current_state = state
                current_date = date
                current_query = query
                current_doms.append(domain)
            elif serp_info != current_serp: # finished one SERP and moved on to next.
                dct = {'date': current_date, 'state': current_state, 'place': current_place, 
                       'query': current_query, 'domains': current_doms, 'serp':current_serp}
                data.append(dct)
                current_serp = serp_info  # move on to next serp
                current_place = place
                current_state = state
                current_date = date
                current_query = query
                current_doms = [domain]
            else: 
                current_doms.append(domain)
    if subset:
        data = makeSubset(data, queries)
    return data 

In [10]:
def mapLocationsToScores(searchLoc, seenDoms, domainLocs):
    """
    VERSION FOR IF NOT EVERY DOMAIN THAT HAS A PLACE HAS A COUNTY. ie If we don't geocode to fill in the info.
    
    For one SERP in a given search location, calculates the vector of localness (relevance) grades.
    Also returns list of tuples showing mapping of domain to score.
    searchLoc: search location information as dictionary
    seenDoms: list of domains on a given SERP
    domainLocs: dictionary of domain and location information, where keys are the domains, and the value is a dictionary that contains place, county, state, and country information for the domain.
    """

    result_types = []  # track what type of result (non-govt or type of govt)
    local_to_search = [] # track whether the results are local to location of search (for local, county, state, native domains)
    in_state = []  # keep track of if local and county domains are in the correct state
    # note all search locations will have a place, county, state, and country
    # while domains may only have some of this information
    
    for dom in seenDoms:  # for each domain on the SERP
        if dom not in domainLocs:  # domain is not a govt/govt maintained domain
            result_types.append("non-govt")
            local_to_search.append(None)  # NA
            in_state.append(None)  # NA
        else:
            domInfo = domainLocs[dom]  # location information for this domain
            gov_type = domInfo['type']  # type of government
            result_types.append(gov_type)
            
            # Now check if domains are related to location of search (for all except federal)
            if gov_type != "Federal":
                correct_state = (domInfo['state'].lower()==searchLoc['state'].lower())
                if correct_state:
                    in_state.append(True)
                    if gov_type == "State" or gov_type=="Native":
                        local_to_search.append(True)
                    elif gov_type == "County":
                        if domInfo['county'].lower() == searchLoc['county'].lower():  # exactly this county
                            local_to_search.append(True)
                        else:
                            local_to_search.append(False)  # Different county in this state
                    elif gov_type=="Local":
                        if domInfo['place'].lower() == searchLoc['place'].lower():  # exactly this place
                            local_to_search.append(True)
                        else:
                            local_to_search.append(False)  # Different place in this state
                    elif gov_type=="rep":
                        if domInfo['district']==searchLoc['district']: # Rep for correct district
                            local_to_search.append(True)
                        else:
                            local_to_search.append(False)
                    elif gov_type=="sen":  
                        local_to_search.append(True) # Correct senator for the search loc state
                else:  # govt domain is not in state
                    local_to_search.append(False) 
                    in_state.append(False)
            else:
                local_to_search.append(None)
                in_state.append(None)
    mapping = list(zip(seenDoms, result_types, local_to_search, in_state)) 
    final_dict = {"seenDoms":seenDoms,
                  "result_types":result_types,
                  "local_to_search":local_to_search,
                  "in_state":in_state
    }
    return result_types, local_to_search, in_state, mapping, final_dict

In [11]:
def proprotions(result_types):
    """Calculates proportions of govt domains per serp (REGARDLESS of correct localness)"""
    # proportion of govt domains (all types/levels)
    total = len(result_types)
    
    # Counts
    gov = sum([1 for t in result_types if t != "non-govt"])
    federal = sum([1 for t in result_types if t=="Federal"])
    state = sum([1 for t in result_types if t=="State"])
    county = sum([1 for t in result_types if t=="County"])
    local = sum([1 for t in result_types if t=="Local"])
    native = sum([1 for t in result_types if t=="Native"])
    rep = sum([1 for t in result_types if t=="rep"])
    sen = sum([1 for t in result_types if t=="sen"])

    results = {"total_results":total,
                "gov_count":gov,
               "federal_count":federal,
               "state_count":state,
               "county_count":county,
               "local_count":local,
               "native_count":native,
               "rep_count": rep,
               "sen_count":sen,
               "gov_prop":gov/total,
               "federal_prop":federal/total,
               "state_prop":state/total,
               "county_prop":county/total,
               "local_prop":local/total,
               "native_prop":native/total,
               "rep_prop":rep/total,
               "sen_prop":sen/total
            }
    
    
    return results

In [12]:
def proprotions_correct(result_types, local_to_search):
    """Calculates proprotions of results that are govt domains in correct location"""
    maps = {"State_correct":0, "County_correct":0, 
            "Native_correct":0,"Local_correct":0, 
            "sen_correct":0,"rep_correct":0,
            "Overall_correct":0}
    for i, gtype in enumerate(result_types):
        if gtype not in ["Federal", "non-govt"] and local_to_search[i] is True:
            maps["_".join([gtype, "correct"])] +=1
    maps['Overall_correct'] = sum([1 for t in local_to_search if t is True])
    props = {"_".join([key, "prop"]):value/len(result_types) for key, value in maps.items()}
    
    # Merge dicts
    final = {**maps, **props}
    return final

In [13]:
def proportions_instate(result_types, in_state):
    """Calculate proportions of results that are in state"""
    maps = {"State_instate":0, "County_instate":0, 
             "Native_instate":0, "Local_instate":0, 
             "sen_instate":0, "rep_instate":0,
             "Overall_instate":0}
    for i, gtype in enumerate(result_types):
        if gtype not in ["Federal", "non-govt"] and in_state[i] is True:
            maps["_".join([gtype, "instate"])] +=1
    maps['Overall_instate'] = sum([1 for t in in_state if t is True])
    props = {"_".join([key, "prop"]):value/len(result_types) for key, value in maps.items()}
    
    # Merge dicts
    final = {**maps, **props}
    return final

In [14]:
def join_dicts(props, props_correct, props_instate):
    join = {**props, **props_correct}
    final = {**join, **props_instate}
    return final

In [15]:
def doOne(searchLoc, seenDoms, domainLocs, serp, txtFile, csvFile):
    """
    Calculates localness score L for one SERP. Logs to txt file.
    """
    result_types, local_to_search, in_state, mapping, final_dict = mapLocationsToScores(searchLoc, seenDoms, domainLocs)
#     print(mapping)
    props = proprotions(result_types)
    props_correct = proprotions_correct(result_types, local_to_search)
    props_instate = proportions_instate(result_types, in_state)
    
    all_props = join_dicts(props, props_correct, props_instate)
#     print(all_props)
    
    toFiles(txtFile, csvFile, mapping, all_props, serp)

In [16]:
def getDate(serp):
    """Get the date from serp string."""
    dateStr = serp.split("_")[0]
    date = datetime.strptime(dateStr, "%m-%d-%y").strftime("%Y-%m-%d")
    return date

In [17]:
def toFiles(txtName, csvName, mapping, props_dict, serp):
    """
    Appends the mapping information to a text file. Also writes the scores to a separate CSV.
    txtName: name of text file
    csvName: name of the csv file
    mapping: mapping of doms to scores
    serp: serp info (date, location, query)
    L: localness score for this SERP
    """
    props_dict['serp'] = serp
    props_dict['date'] = getDate(serp.split("_")[0])
    props_dict['state'] = serp.split("_")[1]
    props_dict['place'] = serp.split("_")[2]
    props_dict['query'] = serp.split("_")[3]
    
    fieldnames = ["serp","date","state","place", "query","total_results","gov_count","federal_count","state_count","county_count","local_count","native_count",
                  "rep_count", "sen_count", "gov_prop","federal_prop","state_prop","county_prop","local_prop","native_prop", "rep_prop", "sen_prop",
                  "State_correct","County_correct","Native_correct","Local_correct","sen_correct","rep_correct","Overall_correct","State_correct_prop","County_correct_prop","Native_correct_prop","Local_correct_prop",
                  "sen_correct_prop","rep_correct_prop","Overall_correct_prop","State_instate","County_instate","Native_instate","Local_instate",
                  "sen_instate","rep_instate","Overall_instate","State_instate_prop","County_instate_prop","Native_instate_prop","Local_instate_prop","sen_instate_prop",
                  "rep_instate_prop","Overall_instate_prop"]
    
    
    with open(txtName, "a") as f:
        f.write(f"{serp}; {mapping}\n")
    with open(csvName, "a", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writerow(props_dict)

In [27]:
def run():
    folder = "/Users/brook/OneDrive/Desktop/Thesis/organic_clean" # FOLDER WITH CSVS
    locPath = "/Users/brook/OneDrive/Desktop/Thesis/search_locs_metric_final.csv"  # csv of search location information
    domPath = "GovDomainCleaning/all_gov_clean_facct2.csv"  # csv of domain location information
    logFolder = "facct_results"
    txtFile = "facct_logging.txt"  # logging files
    csvFile = "fact_proportions_logging.csv"
    
    fieldnames = ["serp","date","state","place", "query","total_results","gov_count","federal_count","state_count","county_count","local_count","native_count",
                  "rep_count", "sen_count", "gov_prop","federal_prop","state_prop","county_prop","local_prop","native_prop", "rep_prop", "sen_prop",
                  "State_correct","County_correct","Native_correct","Local_correct","sen_correct","rep_correct","Overall_correct","State_correct_prop","County_correct_prop","Native_correct_prop","Local_correct_prop",
                  "sen_correct_prop","rep_correct_prop","Overall_correct_prop","State_instate","County_instate","Native_instate","Local_instate",
                  "sen_instate","rep_instate","Overall_instate","State_instate_prop","County_instate_prop","Native_instate_prop","Local_instate_prop","sen_instate_prop",
                  "rep_instate_prop","Overall_instate_prop"]
    
    
    subset=False
    queries=[]
    
#     print("Reading domain info")
    domainLocs = domainLocations(domPath)  # keys are domain, value is dict of location info
#     print("Reading search loc info")
    allSearchLocs = searchLocations(locPath)  # keys are location string (place, state), value is dict of location info
    for root, dirs, files in os.walk(folder):
        for f in files:
            print(f"Starting {f}")
            path = os.path.join(root, f)
            data = readData(path, subset, queries)  # list of dictionaries, where each dict is a serp
            txt = os.path.join(logFolder,"_".join([f.split(".")[0], txtFile]))
            csvF = os.path.join(logFolder, "_".join([f.split(".")[0], csvFile]))

            # make csv with header
            with open(csvF, "w", newline="", encoding="utf-8") as file:
                w = csv.DictWriter(file, fieldnames=fieldnames)
                w.writeheader()
            # calculations per serp
            for dct in data:
                serp = dct['serp']
#                 print(serp)
                this_loc = ",".join([dct['place'], dct['state']])  # place and state of search location
                search_loc = allSearchLocs[this_loc]  # dictionary of search loc info for this location
                doms = dct['domains']  # list of domains for this serp 
                try:
                    doOne(search_loc, doms, domainLocs, serp, txt, csvF)
                except Exception as e:
                    print(e)
                    print(serp)
                    print(doms)
            print(f"Done {f}")
    print(f"Finished all")    


Main

In [28]:
def main():
    run()

In [None]:
main()