In [0]:
!pip install tqdm

In [0]:
import requests
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.types import DataType
from tqdm import tqdm
from time import sleep
tqdm.pandas()

In [0]:
query = """
SELECT *
FROM prd_corpdata.dm_reference_gold.v_dim_imagebank_document
WHERE (
    (doc_name ILIKE '%Public expenditure review%' OR
    doc_name ILIKE '%Public finance review%') AND
    --doc_name ILIKE '%(PER)%' OR
    --doc_name ILIKE '%(PFR)%'
    doc_type_name in ('Report', 'Public Expenditure Review') AND
    doc_name not ILIKE '%Summary%' AND
    doc_name not ILIKE '%Synthesis%' AND
    doc_name not ILIKE '%Presentation%' AND
    doc_name not ILIKE '%Infographic%' AND
    doc_name not ILIKE '%Guidence Note%' AND
    doc_name not ILIKE '%Chapter%' AND
    doc_name not ILIKE '%Module%' AND
    lang_name = 'English' AND
    cntry_name in ('Ghana','Ghana|N/A', 'Pakistan', 'Congo, Democratic Republic of','Kenya','Africa|Kenya', 
                    'Tunisia', 'Malawi','Uganda','Cambodia')
)
AND doc_date != 'Disclosed'
AND disclsr_stat_name = 'Disclosed'
ORDER BY doc_date DESC
"""
select_countries = []
filtered_df = spark.sql(query).toPandas()

In [0]:
filterd_df

In [0]:
failed_urls = []

def fetch_text(url):
    try:
        if not url or not isinstance(url, str):
            failed_urls.append((url, "Invalid URL"))
            return None
        response = requests.get(url, timeout=10)
        if response.status_code != 200:
            failed_urls.append((url, f"Status code: {response.status_code}"))
            return None
        return response.text
    except Exception as e:
        failed_urls.append((url, str(e)))
        return None
    
filtered_df["doc_text"] = filtered_df["ext_text_url"].progress_apply(fetch_text)

print(f"Total failed fetches: {len(failed_urls)}")
for url, reason in failed_urls:
    print(f"Failed: {url} — {reason}")


In [0]:
database_name = "prd_mega.sboost4"

if not spark.catalog.databaseExists(database_name):
    print(f"Database '{database_name}' does not exist. Creating the database.")
    spark.sql(f"CREATE DATABASE {database_name}")

sdf = spark.createDataFrame(filtered_df)
sdf = sdf.select([
    F.col(c).cast("string") if str(sdf.schema[c].dataType) == 'void' else F.col(c)
    for c in sdf.columns
])
sdf.write.mode("overwrite").saveAsTable(f"{database_name}.per_pfr_document_data")

In [0]:
# since the extracted text is in complicated formats, we use a simple chunker to start
def chunk_text_by_chars(text, max_chars=5000):
    chunks = []
    for i in range(0, len(text), max_chars):
        chunk = text[i:i + max_chars].strip()
        if chunk:
            chunks.append(chunk)
    return chunks

chunk_data = []
for row in filtered_df.iterrows():
    node_id = str(row[1]['node_id'])
    text = row[1]['doc_text']
    data = [{'node_id':node_id, 'chunk_id':i, 'chunk_text':chunk} for i, chunk in enumerate(chunk_text_by_chars(text))]
    chunk_data.extend(data)

chunks_df = pd.DataFrame(chunk_data)

database_name = "prd_mega.sboost4"

if not spark.catalog.databaseExists(database_name):
    print(f"Database '{database_name}' does not exist. Creating the database.")
    spark.sql(f"CREATE DATABASE {database_name}")

sdf_chunks = spark.createDataFrame(chunks_df)
sdf_chunks.write.mode("overwrite").saveAsTable(f"{database_name}.per_pfr_chunks")