In [1]:
import os
import re
import pandas as pd
import json

from tqdm import tqdm
from elasticsearch7 import Elasticsearch
import concurrent.futures

# Task2: Link Graph
You should also write a link graph reporting all out-links from each URL you crawl, all the inlinks you have encountered (obviously there will be inlinks on the web that you dont discover). This will be used in a future assignment to calculate PageRank for your collection.

option 1 : We prefer that you store the canonical links as two fields “inlinks” and “outlinks” in ElasticSearch, for each document. You will have to manage these fields appropriately, such that when you are done, your team has correct links for all document crawled.

option 2: maintain a separate links file (you can do this even if you also do option1). Each line of this file contains a tab-separated list of canonical URLs. The first URL is a document you crawled, and the remaining URLs are out-links from the document. When all team members are finished with their crawls, you should merge your link graphs. Only submit one file, containing this merged graph. During the merge process, reduce any URL which was not crawled to just a domain.

In [2]:
path = "trial_run"
document_name = os.path.join(path, "documents.txt")
inlinks_name = os.path.join(path, "inlinks_backup.json")
outlinks_name = os.path.join(path, "outlinks.json")

INDEX_NAME = "crawler"
es = Elasticsearch(cloud_id= "0feeb24636464a578a9c7a1ce9739181:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlvOjQ0MyQyMzcyNjZmYzcwMzg0ZTA2OTM1MTJkZGIxMDgzYTRmMyQ1N2RhZjIzZTNiMWM0MjAwYjBhMDQ0MGY1ZTEyZTc2Yw==",
                   http_auth=("elastic", "pETnMazDlmfyCT2rZ2NAWh2V"))

es.ping()

True

In [5]:
# Read the content of the document.txt file
with open(document_name, "r") as file:
    data = file.read()

with open(inlinks_name, "r") as file:
    inlinks = json.load(file)

# load the outlinks file and replace } with }\n
with open(outlinks_name, 'r') as f:
    outlinks = f.read().replace(']}', ']}\n')
outlinks_raw = outlinks.split('\n')[:-1]

outlinks = {}
for outlink in outlinks_raw:
    outlink = json.loads(outlink)
    outlinks.update(outlink)


# Split the content into individual documents
documents = re.findall(r'<DOC>(.*?)</DOC>', data, re.DOTALL)

# Initialize lists to store extracted information
docnos = []
wavenos = []
outlinknos = []
texts = []
titles = []

# Parse each document
for doc in documents:
    # Extract DOCNO
    docno_match = re.search(r'<DOCNO>(.*?)</DOCNO>', doc, re.DOTALL)
    if docno_match:
        docno = docno_match.group(1)
    else:
        docno = None
    
    # Extract WAVENO
    waveno_match = re.search(r'<WAVENO>(.*?)</WAVENO>', doc, re.DOTALL)
    if waveno_match:
        waveno = waveno_match.group(1)
    else:
        waveno = None
    
    # Extract OUTLINKNO
    outlinkno_match = re.search(r'<OUTLINKNO>(.*?)</OUTLINKNO>', doc, re.DOTALL)
    if outlinkno_match:
        outlinkno = outlinkno_match.group(1)
    else:
        outlinkno = None
    
    # Extract TEXT
    text_match = re.search(r'<TEXT>(.*?)</TEXT>', doc, re.DOTALL)
    if text_match:
        text = text_match.group(1).strip()
    else:
        text = None
    
    # Extract TITLE
    title_match = re.search(r'<TITLE>(.*?)</TITLE>', doc, re.DOTALL)
    if title_match:
        title = title_match.group(1)
    else:
        title = None
    
    # Append extracted information to lists
    docnos.append(docno)
    wavenos.append(waveno)
    outlinknos.append(outlinkno)
    texts.append(text)
    titles.append(title)

# Create a DataFrame
df = pd.DataFrame({
    "DOCNO": docnos,
    "WAVENO": wavenos,
    "OUTLINKNO": outlinknos,
    "TEXT": texts,
    "TITLE": titles
})

df['URL'] = df['DOCNO'].apply(lambda x: x.split(':',1)[-1])
df['DOCNO'] = df['DOCNO'].apply(lambda x: x.split(':',1)[0])

inlink_counts = {url : len(inlinks[url]) for url in inlinks.keys()}
df['INLINK_COUNT'] = df['URL'].map(inlink_counts)

df['INLINKS'] = df['URL'].map(inlinks)
df['OUTLINKS'] = df['URL'].map(outlinks)

df['WAVENO'] = df['WAVENO'].astype('int')
df['OUTLINKNO'] = df['OUTLINKNO'].astype('int')
df['INLINK_COUNT'].fillna(0, inplace=True)
df['INLINK_COUNT'] = df['INLINK_COUNT'].astype('int')
df['OUTLINKS'] = df['OUTLINKS'].apply(lambda x: x if isinstance(x, list) else [])
df['INLINKS'] = df['INLINKS'].apply(lambda x: x if isinstance(x, list) else [])
df['TITLE'].fillna("", inplace=True)

# arange the columns
df = df[['DOCNO', 'URL', 'WAVENO', 'OUTLINKNO', 'INLINK_COUNT', 'TITLE', 'TEXT', 'INLINKS', 'OUTLINKS']]
df.head()

Unnamed: 0,DOCNO,URL,WAVENO,OUTLINKNO,INLINK_COUNT,TITLE,TEXT,INLINKS,OUTLINKS
0,ANSON-1,https://en.wikipedia.org/wiki/2009_swine_flu_p...,0,602,2,2009 swine flu pandemic - Wikipedia,"The 2009 swine flu pandemic, caused by the H1N...",[https://en.wikiquote.org/wiki/Swine_influenza...,[https://www.nytimes.com/2009/05/28/health/pol...
1,ANSON-2,http://www.cnn.com/2009/HEALTH/05/28/us.china....,1,2,2,China quarantines U.S. school group over flu c...,(CNN) -- A group of students and teachers fro...,[https://en.wikipedia.org/wiki/2009_H1N1_flu_o...,"[http://www.sphere.com/, http://politicalticke..."
2,ANSON-3,http://hxnxflu.blogspot.com/2009/05/can-scienc...,1,139,48,H1N1 Flu by crabsallover: Can science save us ...,Read more: Special report on swine fluIS THE w...,[http://hxnxflu.blogspot.com/2009/05/buy-tamif...,[http://www.direct.gov.uk/en/groups/dg_digital...
3,ANSON-4,https://www.cdc.gov/h1n1flu/information_h1n1_v...,1,55,3,CDC H1N1 Flu | Origin of 2009 H1N1 Flu (Swine ...,Content on this page was developed during the ...,[https://en.wikipedia.org/wiki/2009_H1N1_flu_o...,"[https://www.hhs.gov/, https://www.cdc.gov/az/..."
4,ANSON-5,https://simple.wikipedia.org/wiki/2009_swine_f...,1,87,2,2009 swine flu pandemic - Simple English Wikip...,The 2009 flu pandemic or swine flu was an infl...,[https://en.wikipedia.org/wiki/2009_H1N1_flu_o...,[https://fa.wikipedia.org/wiki/%D8%AF%D9%86%DB...


In [4]:
if es.indices.exists(index=INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)

In [6]:
with open('stoplist.txt', 'r') as f:
    stop_words = f.read().splitlines()

configurations = {
    "settings" : {
        "number_of_shards": 1,
        "number_of_replicas": 1,
        "analysis": {
            "filter": {
                "english_stop": {
                    "type": "stop",
                    "stopwords" : stop_words
                }
            },
            "analyzer": {
                "stopped": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "stemmer",
                        "english_stop"
                    ]
                }
            }
      }
    },
    "mappings": {
        "properties": {
            "title" : {
                "type": "text",
                "fielddata": True,
                "analyzer": "stopped",
                "index_options": "positions"
            },

            "content": {
                "type": "text",
                "fielddata": True,
                "analyzer": "stopped",
                "index_options": "positions"
            },
            "author": {"type": "keyword"},
            "inlink_no": {
                "type": "integer"
            },
            "outlink_no": {
                "type": "integer"
            },
            "inlinks": {
                "type": "keyword",
            },
            "outlinks": {
                "type": "keyword",
            }
        }
    }
}
# delete the index if it exists
if es.indices.exists(index=INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)

es.indices.create(index=INDEX_NAME, body = configurations)

def add_data(id, title, text, inlink_no, outlink_no, inlinks, outlinks):
    es.index(index=INDEX_NAME, id=id, document={'author' : ['Anson'], 'title': title, 'content': text, 'inlink_no' : inlink_no, 'outlink_no' : int(outlink_no), 'inlinks' : inlinks, 'outlinks' : outlinks})

def add_data_wrapper(row):
    add_data(row['URL'], row['TITLE'], row['TEXT'], row['INLINK_COUNT'], row['OUTLINKNO'], row['INLINKS'], row['OUTLINKS'])

print("Indexing documents")
# Loop through all documents
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    print("Creating Metadata for threads")
    futures = [executor.submit(add_data_wrapper, row) for _, row in tqdm(df.iterrows(), total = df.shape[0])]
    for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
        pass  # Ensure all tasks are completed

  es.indices.create(index=INDEX_NAME, body = configurations)


Indexing documents
Creating Metadata for threads


100%|██████████| 124420/124420 [00:43<00:00, 2855.92it/s] 
100%|██████████| 124420/124420 [10:00<00:00, 207.33it/s]
