In [333]:
import json
import os
from dolma.taggers.url import BaseDomainTagger
from dolma.core.paths import mkdir_p, parent, cached_path
from typing import NamedTuple, Set
import csv
from collections import defaultdict, Counter
import smart_open

In [334]:
class Matched(NamedTuple):
    source: str
    url: str
    count: int
    wiki: dict

class AggItem(NamedTuple):
    sources: Set[str]
    count: int

    def update(self, source: str, count: int) -> 'AggItem':
        return AggItem(self.sources | {source}, max(self.count, count))

In [335]:
url_reports = "/Users/lucas/Documents/v1_6url-reports/v1_6-reports"
wikidata = "/Users/lucas/Documents/wikidata/wikidata-20220208"
report_name = "domain_blocklist_utp_v1"
report_name = "blocklist_hosts_adware_malware_v1"
report_name ="blocklist_hosts_social_v1"
report_name ="blocklist_hosts_porn_v1"
report_name ="blocklist_hosts_gambling_v1"
report_name ="blocklist_hosts_fakenews_v1"
bad_words = "https://raw.githubusercontent.com/LDNOOBW/List-of-Dirty-Naughty-Obscene-and-Otherwise-Bad-Words/master/en"
sheet_wiki = f"/Users/lucas/Documents/url_sheets/{report_name}/wikidata.csv"
sheet_rest = f"/Users/lucas/Documents/url_sheets/{report_name}/rest.csv"

In [336]:
reports = {}
for report in os.listdir(url_reports):
    if not report.endswith(".json"):
        continue
    if report_name not in report:
        continue
    with open(os.path.join(url_reports, report)) as f:
        reports[report] = json.load(f)

with open(cached_path(bad_words)) as f:
    bad_words = set(f.read().splitlines())

bad_wiki_words = {
    "sex",
    "adult",
    "satire",
    "adult",
    "gossip",
    "tabloid",
    "tracker",
    "dating",
    "image",
    "humor",
    "joke",
    "comedy",
    "porn",
    "social media",
    "freemium",
    "betting",
    "casino",
    "gambling",
    "celebrity",
    "4chan",
    "camming",
    "escort",
    "hentai",
    "imageboard",
    "image hosting",
    "crowdfunding",
    "nudity",
    "comic",
    "camming",
    "online database",
}
no_description = {
    "company",
    "website",

}

In [337]:
wikidata_urls = {}

for concept in os.listdir(wikidata):
    if not os.path.isdir(os.path.join(wikidata, concept)):
        continue
    with open(os.path.join(wikidata, concept, 'response.json')) as f:
        for data in json.load(f):
            description = data.get("description", "").strip()
            if not description or any(bw in description for bw in bad_wiki_words) or description in no_description:
                continue
            try:
                for url in BaseDomainTagger.clean_url(data['url']):
                    wikidata_urls[url] = {
                        "label": data["itemLabel"], "description": data.get("description", "")
                    }
            except:
                pass

In [338]:
import nltk
nltk.download("stopwords")
nltk.download("wordnet")

from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

wnl = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

coll = Counter()

for row in wikidata_urls.values():
    if not row["description"]:
        continue

    words = [w.lower() for w in word_tokenize(row["description"]) if w.isalpha()]
    words = [wnl.lemmatize(w) for w in words if w not in stop_words]
    coll.update(words)

# for k, v in coll.most_common(100):
#     print(f"{v:5d}\t{k}")

[nltk_data] Downloading package stopwords to /Users/lucas/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /Users/lucas/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [None]:
matched = []
for name, report in reports.items():
    for url, count in report['domains']:
        base_url = '.'.join(url.split('/')[0].split('.')[-2:])
        base_url_with_www = f'www.{base_url}'

        if url in wikidata_urls:
            matched_url = url
        elif base_url in wikidata_urls:
            matched_url = base_url
        elif base_url_with_www in wikidata_urls:
            matched_url = base_url_with_www
        else:
            continue

        matched.append(
            Matched(name.strip(".json"), url, count, wikidata_urls[matched_url])
        )

In [None]:
seen = set()

mkdir_p(parent(sheet_wiki))
with smart_open.open(sheet_wiki, 'wt') as f:
    writer = csv.DictWriter(f, fieldnames=["url", "count", "allow?", "label", "source", "description", "notes"])
    writer.writeheader()
    for match in sorted(matched, key=lambda x: x.count, reverse=True):
        if match.url in seen:
            continue
        seen.add(match.url)
        writer.writerow({
            "source": match.source,
            "url": match.url,
            "count": match.count,
            "allow?": "",
            "label": (match.wiki['label'] or "  "),
            "description": (match.wiki['description'] or "  ")
        })

In [None]:
agg = defaultdict(lambda: AggItem(set(), 0))

for name, report in reports.items():
    for url, count in report["domains"]:
        if url not in seen:
            agg[url] = agg[url].update(name, count)

with smart_open.open(sheet_rest, 'wt') as f:
    writer = csv.DictWriter(f, fieldnames=["url", "count", "allow?", "sources"])
    writer.writeheader()
    for url, agg_item in sorted(agg.items(), key=lambda x: x[1].count, reverse=True):
        if agg_item.count < 500:
            continue
        writer.writerow({
            "url": url,
            "allow?": "",
            "sources": ",".join(agg_item.sources),
            "count": agg_item.count
        })