In [11]:
pip install schedule

Collecting schedule
  Downloading schedule-1.2.2-py3-none-any.whl.metadata (3.8 kB)
Downloading schedule-1.2.2-py3-none-any.whl (12 kB)
Installing collected packages: schedule
Successfully installed schedule-1.2.2
Note: you may need to restart the kernel to use updated packages.


In [12]:
pip install lxml

Note: you may need to restart the kernel to use updated packages.


In [13]:
pip install html5lib

Note: you may need to restart the kernel to use updated packages.


In [None]:
import requests
import pandas as pd
import datetime
import pytz
import schedule
import time
from datetime import datetime
from io import StringIO
import os
import json

In [None]:
EXPECTED_COLUMNS_ZSE = ['Company Name', 'Opening Price', 'Closing Price', 'Total Traded Volume']
EXPECTED_COLUMNS_VIC = ['Company Name', 'Opening Price', 'Closing Price', 'Total Traded Volume']

class ScrapeLogger:
    def __init__(self):
        self.log = {
            "timestamp": datetime.now(pytz.timezone('Africa/Harare')).isoformat(),
            "status": "success",
            "sources": {},
            "warnings": [],
            "errors": []
        }
    
    def log_source(self, name, status, rows=0, columns=None, error=None):
        self.log["sources"][name] = {
            "status": status,
            "rows_scraped": rows,
            "columns_found": columns or [],
            "error": error
        }
        if status == "failure":
            self.log["status"] = "partial" if self.log["status"] == "success" else "failure"
    
    def add_warning(self, msg):
        self.log["warnings"].append(msg)
    
    def add_error(self, msg):
        self.log["errors"].append(msg)
        self.log["status"] = "failure"
    
    def save(self, path="logs/scrape_log.json"):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        try:
            with open(path, 'r') as f:
                history = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            history = []
        
        history.append(self.log)
        history = history[-90:]
        
        with open(path, 'w') as f:
            json.dump(history, f, indent=2)

def save_html_snapshot(source_name, html_content):
    snapshot_dir = "logs/html_snapshots"
    os.makedirs(snapshot_dir, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filepath = f"{snapshot_dir}/{source_name}_{timestamp}.html"
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(html_content)
    
    snapshots = sorted([f for f in os.listdir(snapshot_dir) if f.startswith(source_name)])
    for old_snapshot in snapshots[:-5]:
        os.remove(os.path.join(snapshot_dir, old_snapshot))

logger = ScrapeLogger()

In [141]:
url = "https://www.zse.co.zw/price-sheet/"
url2 = "https://zimpricecheck.com/price-updates/official-and-black-market-exchange-rates/?srsltid=AfmBOoo-30J1RAcbr6OMk7Z-R0rUF_sH7WBp97Qt1O3C4FrP8n7cXhj_"
url3 = "https://www.vfex.exchange/price-sheet/"

In [None]:
def get_todays_data_zse():
    source_name = "zse"
    response = None
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        
        try:
            data = pd.read_html(StringIO(response.text))
        except ValueError as e:
            if "No tables found" in str(e):
                logger.log_source(source_name, "failure", error="No tables found in HTML (website structure may have changed)")
                save_html_snapshot(source_name, response.text)
                raise ValueError(f"ZSE: No tables found in HTML")
            raise
        
        df = pd.DataFrame(data[0])
        
        if df.empty:
            logger.log_source(source_name, "failure", error="Table is empty")
            save_html_snapshot(source_name, response.text)
            raise ValueError("ZSE table is empty")
        
        df.columns = df.iloc[0]
        df = df[1:]
        df = df.dropna()
        
        found_cols = list(df.columns)
        missing_cols = [c for c in EXPECTED_COLUMNS_ZSE if c not in found_cols]
        if missing_cols:
            logger.add_warning(f"ZSE missing columns: {missing_cols}")
            save_html_snapshot(source_name, response.text)
        
        extra_cols = [c for c in found_cols if c not in EXPECTED_COLUMNS_ZSE]
        if extra_cols:
            logger.add_warning(f"ZSE new columns detected: {extra_cols}")
        
        logger.log_source(source_name, "success", rows=len(df), columns=found_cols)
        return df
        
    except requests.exceptions.Timeout:
        logger.log_source(source_name, "failure", error="Request timeout after 30s")
        raise
    except requests.exceptions.HTTPError as e:
        logger.log_source(source_name, "failure", error=f"HTTP {e.response.status_code}: {e.response.reason}")
        if response:
            save_html_snapshot(source_name, response.text)
        raise
    except requests.exceptions.RequestException as e:
        logger.log_source(source_name, "failure", error=f"Network error: {str(e)}")
        raise

def get_open_price_zse():
    df = get_todays_data_zse()
    open_price = df[['Company Name','Opening Price']].T
    open_price.columns  = open_price.iloc[0]
    open_price = open_price[1:]
    open_price.index.name = 'Date'
    open_price = open_price.rename(
      index={open_price.index[0]: datetime.now(pytz.timezone('Africa/Harare'))})
    return open_price

def get_close_price_zse():
    df = get_todays_data_zse()
    close_price = df[['Company Name','Closing Price']].T
    close_price.columns  = close_price.iloc[0]
    close_price = close_price[1:]
    close_price.index.name = 'Date'
    close_price = close_price.rename(
      index={close_price.index[0]: datetime.now(pytz.timezone('Africa/Harare'))})
    return close_price

def get_vol_traded_zse():
    df = get_todays_data_zse()
    vol_traded = df[['Company Name','Total Traded Volume']].T
    vol_traded.columns  = vol_traded.iloc[0]
    vol_traded = vol_traded[1:]
    vol_traded.index.name = 'Date'
    vol_traded = vol_traded.rename(
      index={vol_traded.index[0]: datetime.now(pytz.timezone('Africa/Harare'))})
    return vol_traded

In [None]:
def get_todays_data_vic():
    source_name = "vfex"
    response = None
    try:
        response = requests.get(url3, timeout=30)
        response.raise_for_status()
        
        try:
            data = pd.read_html(StringIO(response.text))
        except ValueError as e:
            if "No tables found" in str(e):
                logger.log_source(source_name, "failure", error="No tables found in HTML (website structure may have changed)")
                save_html_snapshot(source_name, response.text)
                raise ValueError(f"VFEX: No tables found in HTML")
            raise
        
        df = pd.DataFrame(data[0])
        
        if df.empty:
            logger.log_source(source_name, "failure", error="Table is empty")
            save_html_snapshot(source_name, response.text)
            raise ValueError("VFEX table is empty")
        
        df.columns = df.iloc[0]
        df = df[1:]
        df = df.dropna()
        
        found_cols = list(df.columns)
        missing_cols = [c for c in EXPECTED_COLUMNS_VIC if c not in found_cols]
        if missing_cols:
            logger.add_warning(f"VFEX missing columns: {missing_cols}")
            save_html_snapshot(source_name, response.text)
        
        extra_cols = [c for c in found_cols if c not in EXPECTED_COLUMNS_VIC]
        if extra_cols:
            logger.add_warning(f"VFEX new columns detected: {extra_cols}")
        
        logger.log_source(source_name, "success", rows=len(df), columns=found_cols)
        return df
        
    except requests.exceptions.Timeout:
        logger.log_source(source_name, "failure", error="Request timeout after 30s")
        raise
    except requests.exceptions.HTTPError as e:
        logger.log_source(source_name, "failure", error=f"HTTP {e.response.status_code}: {e.response.reason}")
        if response:
            save_html_snapshot(source_name, response.text)
        raise
    except requests.exceptions.RequestException as e:
        logger.log_source(source_name, "failure", error=f"Network error: {str(e)}")
        raise

def get_open_price_vic():
    df = get_todays_data_vic()
    open_price = df[['Company Name','Opening Price']].T
    open_price.columns  = open_price.iloc[0]
    open_price = open_price[1:]
    open_price.index.name = 'Date'
    open_price = open_price.rename(
      index={open_price.index[0]: datetime.now(pytz.timezone('Africa/Harare'))})
    return open_price

def get_close_price_vic():
    df = get_todays_data_vic()
    close_price = df[['Company Name','Closing Price']].T
    close_price.columns  = close_price.iloc[0]
    close_price = close_price[1:]
    close_price.index.name = 'Date'
    close_price = close_price.rename(
      index={close_price.index[0]: datetime.now(pytz.timezone('Africa/Harare'))})
    return close_price

def get_vol_traded_vic():
    df = get_todays_data_vic()
    vol_traded = df[['Company Name','Total Traded Volume']].T
    vol_traded.columns  = vol_traded.iloc[0]
    vol_traded = vol_traded[1:]
    vol_traded.index.name = 'Date'
    vol_traded = vol_traded.rename(
      index={vol_traded.index[0]: datetime.now(pytz.timezone('Africa/Harare'))})
    return vol_traded

In [None]:
def get_rates():
    source_name = "rates"
    response = None
    try:
        response = requests.get(url2, timeout=30)
        response.raise_for_status()
        
        try:
            data = pd.read_html(StringIO(response.text))
        except ValueError as e:
            if "No tables found" in str(e):
                logger.log_source(source_name, "failure", error="No tables found in HTML (website structure may have changed)")
                save_html_snapshot(source_name, response.text)
                raise ValueError(f"Rates: No tables found in HTML")
            raise
        
        df = pd.DataFrame(data[0])
        
        if df.empty:
            logger.log_source(source_name, "failure", error="Table is empty")
            save_html_snapshot(source_name, response.text)
            raise ValueError("Rates table is empty")

        df['value'] = df['Value'].str.extract(r'(\d+\.?\d*)').astype(float)
        df = df.drop(columns=['Value'], axis=1)
        
        target_rates = [
            '1 USD to ZiG',
            '1 USD to ZiG Lowest Informal Sector Rate',
            '1 USD to ZiG Highest Informal Sector Rate'
        ]
        filtered_rates = df[df['Rate'].isin(target_rates)]

        try:
            usd_rate = filtered_rates.loc[filtered_rates['Rate'] == '1 USD to ZiG', 'value'].values[0]
        except IndexError:
            usd_rate = pd.NA
            logger.add_warning("Rates: USD rate not found")

        try:
            lowest_rate = filtered_rates.loc[filtered_rates['Rate'] == '1 USD to ZiG Lowest Informal Sector Rate', 'value'].values[0]
        except IndexError:
            lowest_rate = pd.NA
            logger.add_warning("Rates: Lowest informal rate not found")

        try:
            highest_rate = filtered_rates.loc[filtered_rates['Rate'] == '1 USD to ZiG Highest Informal Sector Rate', 'value'].values[0]
        except IndexError:
            highest_rate = pd.NA
            logger.add_warning("Rates: Highest informal rate not found")

        rates = pd.DataFrame([{
            'Date': datetime.now(pytz.timezone('Africa/Harare')),
            'USA DOLLAR': usd_rate,
            'Lowest Informal Sector Rate': lowest_rate,
            'Highest Informal Sector Rate': highest_rate
        }])

        logger.log_source(source_name, "success", rows=1, columns=['USA DOLLAR', 'Lowest Informal Sector Rate', 'Highest Informal Sector Rate'])
        return rates
        
    except requests.exceptions.Timeout:
        logger.log_source(source_name, "failure", error="Request timeout after 30s")
        raise
    except requests.exceptions.HTTPError as e:
        logger.log_source(source_name, "failure", error=f"HTTP {e.response.status_code}: {e.response.reason}")
        if response:
            save_html_snapshot(source_name, response.text)
        raise
    except requests.exceptions.RequestException as e:
        logger.log_source(source_name, "failure", error=f"Network error: {str(e)}")
        raise
    except KeyError as e:
        logger.log_source(source_name, "failure", error=f"Missing column: {str(e)}")
        if response:
            save_html_snapshot(source_name, response.text)
        raise

In [176]:
# # code to retrieve existing json files with Zim Stock Data for updating
# open_json_zse = pd.read_json('/Users/teekaynium/Desktop/open_price_zse.json', orient = 'split')
# close_json_zse = pd.read_json('/Users/teekaynium/Desktop/close_price_zse.json', orient = 'split')
# vol_json_zse = pd.read_json('/Users/teekaynium/Desktop/vol_traded_zse.json', orient = 'split')
# rates_json = pd.read_json('/Users/teekaynium/Desktop/rates.json', orient = 'split')
# open_json_vic = pd.read_json('/Users/teekaynium/Desktop/open_price_vic.json', orient = 'split')
# close_json_vic = pd.read_json('/Users/teekaynium/Desktop/close_price_vic.json', orient = 'split')
# vol_json_vic = pd.read_json('/Users/teekaynium/Desktop/vol_traded_vic.json', orient = 'split')

In [178]:
# code to retrieve existing json files with Zim Stock Data for updating
open_json_zse = pd.read_json('archive-single-file/open_price_zse.json', orient = 'split')
close_json_zse = pd.read_json('archive-single-file/close_price_zse.json', orient = 'split')
vol_json_zse = pd.read_json('archive-single-file/vol_traded_zse.json', orient = 'split')
rates_json = pd.read_json('archive-single-file/rates.json', orient = 'split')
open_json_vic = pd.read_json('archive-single-file/open_price_vic.json', orient = 'split')
close_json_vic = pd.read_json('archive-single-file/close_price_vic.json', orient = 'split')
vol_json_vic = pd.read_json('archive-single-file/vol_traded_vic.json', orient = 'split')

In [None]:
def update_data():
    results = {
        'open_price_zse': None,
        'close_price_zse': None,
        'vol_traded_zse': None,
        'rates': None,
        'open_price_vic': None,
        'close_price_vic': None,
        'vol_traded_vic': None,
    }
    
    # ZSE update
    try:
        zse_data = get_todays_data_zse()
        results['open_price_zse'] = pd.concat([open_json_zse, get_open_price_zse()], axis=0)
        results['close_price_zse'] = pd.concat([close_json_zse, get_close_price_zse()], axis=0)
        results['vol_traded_zse'] = pd.concat([vol_json_zse, get_vol_traded_zse()], axis=0)
    except Exception as e:
        logger.add_error(f"ZSE update failed: {type(e).__name__}: {str(e)}")

    # Exchange rate update
    try:
        results['rates'] = pd.concat([rates_json, get_rates()], axis=0)
    except Exception as e:
        logger.add_error(f"Rates update failed: {type(e).__name__}: {str(e)}")

    # VFEX update
    try:
        vic_data = get_todays_data_vic()
        results['open_price_vic'] = pd.concat([open_json_vic, get_open_price_vic()], axis=0)
        results['close_price_vic'] = pd.concat([close_json_vic, get_close_price_vic()], axis=0)
        results['vol_traded_vic'] = pd.concat([vol_json_vic, get_vol_traded_vic()], axis=0)
    except Exception as e:
        logger.add_error(f"VFEX update failed: {type(e).__name__}: {str(e)}")
    
    return results

In [None]:
def save_data(results):
    saved_count = 0
    
    if results['open_price_zse'] is not None:
        results['open_price_zse'].to_json('archive-single-file/open_price_zse.json', orient='split', date_format='iso')
        saved_count += 1
    if results['close_price_zse'] is not None:
        results['close_price_zse'].to_json('archive-single-file/close_price_zse.json', orient='split', date_format='iso')
        saved_count += 1
    if results['vol_traded_zse'] is not None:
        results['vol_traded_zse'].to_json('archive-single-file/vol_traded_zse.json', orient='split', date_format='iso')
        saved_count += 1
    if results['rates'] is not None:
        results['rates'].to_json('archive-single-file/rates.json', orient='split', date_format='iso')
        saved_count += 1
    if results['open_price_vic'] is not None:
        results['open_price_vic'].to_json('archive-single-file/open_price_vic.json', orient='split', date_format='iso')
        saved_count += 1
    if results['close_price_vic'] is not None:
        results['close_price_vic'].to_json('archive-single-file/close_price_vic.json', orient='split', date_format='iso')
        saved_count += 1
    if results['vol_traded_vic'] is not None:
        results['vol_traded_vic'].to_json('archive-single-file/vol_traded_vic.json', orient='split', date_format='iso')
        saved_count += 1
    
    return saved_count

try:
    results = update_data()
    saved = save_data(results)
    
    if saved == 0:
        logger.add_error("No data was saved - all sources failed")
    elif saved < 7:
        logger.add_warning(f"Partial save: {saved}/7 files updated")
finally:
    logger.save()

In [185]:
# def save_data(open_price_zse: pd.DataFrame, close_price_zse: pd.DataFrame, vol_traded_zse: pd.DataFrame, rates: pd.DataFrame,
#               open_price_vic: pd.DataFrame, close_price_vic: pd.DataFrame, vol_traded_vic: pd.DataFrame):
#     """ function to save updated dataframes as json files"""
#     # Save the DataFrames to JSON files with the timestamped directory
#     # ZSE update
#     open_price_zse.to_json('/Users/teekaynium/Desktop/open_price_zse.json', orient='split', date_format='iso')
#     close_price_zse.to_json('/Users/teekaynium/Desktop/close_price_zse.json', orient='split', date_format='iso')
#     vol_traded_zse.to_json('/Users/teekaynium/Desktop/vol_traded_zse.json', orient='split', date_format='iso')

#     # Exchange rates update
#     rates.to_json('/Users/teekaynium/Desktop/rates.json', orient='split', date_format='iso')

#     # VFEX update
#     open_price_vic.to_json('/Users/teekaynium/Desktop/open_price_vic.json', orient='split', date_format='iso')
#     close_price_vic.to_json('/Users/teekaynium/Desktop/close_price_vic.json', orient='split', date_format='iso')
#     vol_traded_vic.to_json('/Users/teekaynium/Desktop/vol_traded_vic.json', orient='split', date_format='iso')
    

# save_data(update_data()[0], update_data()[1] , update_data()[2], update_data()[3], update_data()[4] , update_data()[5], update_data()[6])

In [194]:
# def save_data(rates: pd.DataFrame, open_price_vic: pd.DataFrame, close_price_vic: pd.DataFrame, vol_traded_vic: pd.DataFrame):
#     """ function to save updated dataframes as json files"""
#     # # Save the DataFrames to JSON files with the timestamped directory
#     # # ZSE update
#     # open_price_zse.to_json('/Users/teekaynium/Desktop/open_price_zse.json', orient='split', date_format='iso')
#     # close_price_zse.to_json('/Users/teekaynium/Desktop/close_price_zse.json', orient='split', date_format='iso')
#     # vol_traded_zse.to_json('/Users/teekaynium/Desktop/vol_traded_zse.json', orient='split', date_format='iso')

#     # Exchange rates update
#     rates.to_json('/Users/teekaynium/Desktop/rates.json', orient='split', date_format='iso')

#     # VFEX update
#     open_price_vic.to_json('/Users/teekaynium/Desktop/open_price_vic.json', orient='split', date_format='iso')
#     close_price_vic.to_json('/Users/teekaynium/Desktop/close_price_vic.json', orient='split', date_format='iso')
#     vol_traded_vic.to_json('/Users/teekaynium/Desktop/vol_traded_vic.json', orient='split', date_format='iso')
    

# save_data(get_rates(), get_open_price_vic(), get_open_price_vic() , get_open_price_vic())