In [0]:
# Databricks Notebook: Simplified Web Scraping for IBEX Day-Ahead Prices and Volumes
# Purpose: Scrape three tables, clean data, and store as Databricks tables.

# Install required libraries (minimal set)
%pip install requests beautifulsoup4 pandas

# Import libraries
import requests
from bs4 import BeautifulSoup
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder.appName("IBEX_Web_Scraping_Simple").getOrCreate()

# Set up database
database_name = "ibex_scraping_simple"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
spark.sql(f"USE {database_name}")

# Verify permissions
try:
    spark.sql(f"CREATE TABLE IF NOT EXISTS {database_name}.test_perm (id INT)")
    spark.sql(f"DROP TABLE {database_name}.test_perm")
    print("Database permissions verified.")
except Exception as e:
    raise Exception(f"Permission issue with database {database_name}: {e}")

# Generate timestamp for table names
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Step 1: Web scrape using requests
url = "https://ibex.bg/markets/dam/day-ahead-prices-and-volumes-v2-0-2/"
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.5",
    "Connection": "keep-alive"
}

response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
tables = soup.find_all("table")

if len(tables) < 3:
    raise Exception(f"Expected 3 tables, found {len(tables)}")
print(f"Successfully fetched webpage with {len(tables)} tables.")

# Step 2: Extract and clean data
dataframes = []
for i, table in enumerate(tables[:3]):
    rows = table.find_all("tr")
    headers = [th.get_text(strip=True) for th in rows[0].find_all("th")]
    
    # Handle empty/duplicate headers
    if not headers or all(h == "" for h in headers):
        num_cols = len(rows[1].find_all("td"))
        headers = [f"col_{j}" for j in range(num_cols)]
    else:
        seen = set()
        new_headers = []
        for j, h in enumerate(headers):
            h_clean = h if h else f"col_{j}"
            while h_clean in seen:
                h_clean = f"col_{j}"
                j += 1
            new_headers.append(h_clean)
            seen.add(h_clean)
        headers = new_headers
    
    data = []
    for row in rows[1:]:
        cols = [td.get_text(strip=True) for td in row.find_all("td")]
        if len(cols) == len(headers):
            data.append(cols)
    
    # Create DataFrame
    df = pd.DataFrame(data, columns=headers)
    if df.empty:
        print(f"Warning: Table {i+1} is empty after extraction.")
        continue
    
    # Clean the DataFrame
    df = df.dropna(how="all")
    for col in df.columns:
        # Handle numbers (e.g., "1,234.56" or "1 234,56")
        df[col] = df[col].str.replace(" ", "").str.replace(",", ".")
        # Remove currency symbols
        df[col] = df[col].str.replace(r"[€BGN]", "", regex=True)
        try:
            if df[col].str.replace(".", "").str.isnumeric().all():
                df[col] = df[col].astype(float)
        except (AttributeError, ValueError):
            pass
        # Handle dates
        try:
            if df[col].str.match(r"\d{2}\.\d{2}\.\d{4}").all():
                df[col] = pd.to_datetime(df[col], format="%d.%m.%Y")
            elif df[col].str.match(r"\d{4}-\d{2}-\d{2}").all():
                df[col] = pd.to_datetime(df[col])
        except (AttributeError, ValueError):
            pass
    
    dataframes.append(df)
    print(f"Table {i+1} shape: {df.shape}")
    display(df.head())

if len(dataframes) != 3:
    raise Exception(f"Expected 3 DataFrames, got {len(dataframes)}")

# Step 3: Save as Databricks tables
table_names = [
    f"ibex_table_1_{timestamp}",
    f"ibex_table_2_{timestamp}",
    f"ibex_table_3_{timestamp}"
]

# Drop existing tables
for table in table_names:
    spark.sql(f"DROP TABLE IF EXISTS {database_name}.{table}")

# Save tables
for i, df in enumerate(dataframes):
    schema = StructType([
        StructField(col.replace(" ", "_").replace(".", "_").replace("(", "").replace(")", ""), StringType(), True) for col in df.columns
    ])
    
    spark_df = spark.createDataFrame(df, schema=schema)
    spark_df.write.mode("overwrite").saveAsTable(f"{database_name}.{table_names[i]}")
    print(f"Saved table: {database_name}.{table_names[i]}")

# Step 4: Verify tables
for table in table_names:
    count = spark.table(f"{database_name}.{table}").count()
    print(f"Table {database_name}.{table} has {count} rows")

print("Listing tables in database:")
spark.sql(f"SHOW TABLES IN {database_name} LIKE 'ibex_table_%'").show()

print("Web scraping and table storage completed successfully!")

Python interpreter will be restarted.
Python interpreter will be restarted.




Database permissions verified.
Successfully fetched webpage with 4 tables.


  if df[col].str.replace(".", "").str.isnumeric().all():


Table 1 shape: (2, 8)


col_0,col_1,col_2,col_3,col_4,col_5,col_6,col_7
Prices(EUR/MWh),,,,,,,
Volume(MWh),,,,,,,


Table 2 shape: (3, 8)


  if df[col].str.replace(".", "").str.isnumeric().all():


col_0,col_1,col_2,col_3,col_4,col_5,col_6,col_7
ase(01-24),,,,,,,
Peak(9-20),,,,,,,
Off-Peak(1-8&21-24),,,,,,,


  if df[col].str.replace(".", "").str.isnumeric().all():


Table 3 shape: (25, 9)


col_0,col_1,col_2,col_3,col_4,col_5,col_6,col_7,col_8
0-1,EUR/MWh,,,,,,,
1-2,EUR/MWh,,,,,,,
2-3,EUR/MWh,,,,,,,
2-3A,EUR/MWh,,,,,,,
3-4,EUR/MWh,,,,,,,


Saved table: ibex_scraping_simple.ibex_table_1_20250611_104030
Saved table: ibex_scraping_simple.ibex_table_2_20250611_104030
Saved table: ibex_scraping_simple.ibex_table_3_20250611_104030
Table ibex_scraping_simple.ibex_table_1_20250611_104030 has 2 rows
Table ibex_scraping_simple.ibex_table_2_20250611_104030 has 3 rows
Table ibex_scraping_simple.ibex_table_3_20250611_104030 has 25 rows
Listing tables in database:
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

Web scraping and table storage completed successfully!


In [0]:
# Databricks Notebook: Simplified Web Scraping for IBEX Day-Ahead Prices and Volumes
# Purpose: Scrape three tables, clean data, and store as Databricks tables.

# Install required libraries
%pip install requests beautifulsoup4 pandas

# Import libraries
import requests
from bs4 import BeautifulSoup
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder.appName("IBEX_Web_Scraping_Simple").getOrCreate()

# Set up a fresh database with timestamp to avoid conflicts
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
database_name = f"ibex_scraping_{timestamp}"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
spark.sql(f"USE {database_name}")

# Verify permissions
try:
    spark.sql(f"CREATE TABLE IF NOT EXISTS {database_name}.test_perm (id INT)")
    spark.sql(f"DROP TABLE {database_name}.test_perm")
    print("Database permissions verified.")
except Exception as e:
    raise Exception(f"Permission issue with database {database_name}: {e}")

# Clear catalog cache to avoid metadata conflicts
spark.sql("CLEAR CACHE")

# Step 1: Web scrape using requests
url = "https://ibex.bg/markets/dam/day-ahead-prices-and-volumes-v2-0-2/"
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.5",
    "Connection": "keep-alive"
}

response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
tables = soup.find_all("table")

if len(tables) < 3:
    raise Exception(f"Expected 3 tables, found {len(tables)}")
print(f"Successfully fetched webpage with {len(tables)} tables.")

# Step 2: Extract and clean data
dataframes = []
for i, table in enumerate(tables[:3]):
    rows = table.find_all("tr")
    headers = [th.get_text(strip=True) for th in rows[0].find_all("th")]
    
    # Handle empty/duplicate headers
    if not headers or all(h == "" for h in headers):
        num_cols = len(rows[1].find_all("td"))
        headers = [f"col_{j}" for j in range(num_cols)]
    else:
        seen = set()
        new_headers = []
        for j, h in enumerate(headers):
            h_clean = h if h else f"col_{j}"
            while h_clean in seen:
                h_clean = f"col_{j}"
                j += 1
            new_headers.append(h_clean)
            seen.add(h_clean)
        headers = new_headers
    
    data = []
    for row in rows[1:]:
        cols = [td.get_text(strip=True) for td in row.find_all("td")]
        if len(cols) == len(headers):
            data.append(cols)
    
    # Create DataFrame
    df = pd.DataFrame(data, columns=headers)
    if df.empty:
        print(f"Warning: Table {i+1} is empty after extraction.")
        continue
    
    # Clean the DataFrame
    df = df.dropna(how="all")
    for col in df.columns:
        # Handle numbers (e.g., "1,234.56" or "1 234,56")
        df[col] = df[col].str.replace(" ", "").str.replace(",", ".")
        # Remove currency symbols
        df[col] = df[col].str.replace(r"[€BGN]", "", regex=True)
        try:
            if df[col].str.replace(".", "").str.isnumeric().all():
                df[col] = df[col].astype(float)
        except (AttributeError, ValueError):
            pass
        # Handle dates
        try:
            if df[col].str.match(r"\d{2}\.\d{2}\.\d{4}").all():
                df[col] = pd.to_datetime(df[col], format="%d.%m.%Y")
            elif df[col].str.match(r"\d{4}-\d{2}-\d{2}").all():
                df[col] = pd.to_datetime(df[col])
        except (AttributeError, ValueError):
            pass
    
    dataframes.append(df)
    print(f"Table {i+1} shape: {df.shape}")
    display(df.head())

if len(dataframes) != 3:
    raise Exception(f"Expected 3 DataFrames, got {len(dataframes)}")

# Step 3: Save as Databricks tables
table_names = [
    f"table_1",
    f"table_2",
    f"table_3"
]

# Drop existing tables
for table in table_names:
    spark.sql(f"DROP TABLE IF EXISTS {database_name}.{table}")

# Save tables
for i, df in enumerate(dataframes):
    schema = StructType([
        StructField(col.replace(" ", "_").replace(".", "_").replace("(", "").replace(")", ""), StringType(), True) for col in df.columns
    ])
    
    spark_df = spark.createDataFrame(df, schema=schema)
    spark_df.write.mode("overwrite").saveAsTable(f"{database_name}.{table_names[i]}")
    print(f"Saved table: {database_name}.{table_names[i]}")

# Step 4: Verify tables
for table in table_names:
    count = spark.table(f"{database_name}.{table}").count()
    print(f"Table {database_name}.{table} has {count} rows")

# Fixed SHOW TABLES command
print("Listing tables in database:")
try:
    spark.sql(f"SHOW TABLES IN {database_name}").show()
except Exception as e:
    print(f"Error listing tables: {e}")
    # Fallback to list all databases and manually check
    spark.sql("SHOW DATABASES").show()
    print(f"Check if {database_name} exists in the above list.")

print("Web scraping and table storage completed successfully!")

Python interpreter will be restarted.
Python interpreter will be restarted.




Database permissions verified.
Successfully fetched webpage with 4 tables.
Table 1 shape: (2, 8)


  if df[col].str.replace(".", "").str.isnumeric().all():


col_0,col_1,col_2,col_3,col_4,col_5,col_6,col_7
Prices(EUR/MWh),,,,,,,
Volume(MWh),,,,,,,


  if df[col].str.replace(".", "").str.isnumeric().all():


Table 2 shape: (3, 8)


col_0,col_1,col_2,col_3,col_4,col_5,col_6,col_7
ase(01-24),,,,,,,
Peak(9-20),,,,,,,
Off-Peak(1-8&21-24),,,,,,,


  if df[col].str.replace(".", "").str.isnumeric().all():


Table 3 shape: (25, 9)


col_0,col_1,col_2,col_3,col_4,col_5,col_6,col_7,col_8
0-1,EUR/MWh,,,,,,,
1-2,EUR/MWh,,,,,,,
2-3,EUR/MWh,,,,,,,
2-3A,EUR/MWh,,,,,,,
3-4,EUR/MWh,,,,,,,


Saved table: ibex_scraping_20250611_104612.table_1
Saved table: ibex_scraping_20250611_104612.table_2
Saved table: ibex_scraping_20250611_104612.table_3
Table ibex_scraping_20250611_104612.table_1 has 2 rows
Table ibex_scraping_20250611_104612.table_2 has 3 rows
Table ibex_scraping_20250611_104612.table_3 has 25 rows
Listing tables in database:
+--------------------+---------+-----------+
|            database|tableName|isTemporary|
+--------------------+---------+-----------+
|ibex_scraping_202...|  table_1|      false|
|ibex_scraping_202...|  table_2|      false|
|ibex_scraping_202...|  table_3|      false|
+--------------------+---------+-----------+

Web scraping and table storage completed successfully!


In [0]:
%pip install requests beautifulsoup4 pandas lxml
%pip install lxml
 

import pandas as pd
import requests
import re
 
# STEP 3: Download the HTML page
url = "https://ibex.bg/markets/dam/day-ahead-prices-and-volumes-v2-0-2/"
headers = {"User-Agent": "Mozilla/5.0"}  # Pretend like a browser
response = requests.get(url, headers=headers)
html = response.content
 
# STEP 4: Read all tables from the page
tables = pd.read_html(html)
 
# STEP 5: Keep only the 3 real tables (skip tables[0])
table_peak_offpeak = tables[1]
table_hourly_raw = tables[2]
table_daily_summary = tables[3]
 
# STEP 6: Clean hourly table (alternate rows: prices and volumes)
hour_labels = table_hourly_raw.iloc[::2, 0].values
eur_prices = table_hourly_raw.iloc[::2, 1].values
volumes = table_hourly_raw.iloc[1::2, 1].values
 
table_hourly = pd.DataFrame({
    "Hour": hour_labels,
    "Price_EUR_per_MWh": eur_prices,
    "Volume_MWh": volumes
})
 
# STEP 7: Clean column names for Spark compatibility
def clean_column_names(df):
    df.columns = [
        re.sub(r"[^\w]", "_", col).strip("_")  # keep only letters, digits, underscore
        for col in df.columns
    ]
    return df
 
table_hourly = clean_column_names(table_hourly)
table_daily_summary = clean_column_names(table_daily_summary)
table_peak_offpeak = clean_column_names(table_peak_offpeak)
 
# STEP 8: Convert to Spark DataFrames
spark_df1 = spark.createDataFrame(table_hourly)
spark_df2 = spark.createDataFrame(table_daily_summary)
spark_df3 = spark.createDataFrame(table_peak_offpeak)
 
# STEP 9: Save as Databricks tables
spark_df1.write.mode("overwrite").saveAsTable("ibex_hourly_prices_volumes")
spark_df2.write.mode("overwrite").saveAsTable("ibex_daily_summary")
spark_df3.write.mode("overwrite").saveAsTable("ibex_peak_offpeak_summary")
 
 

Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.




In [0]:
display(spark.sql("SELECT * FROM ibex_hourly_prices_volumes"))

Hour,Price_EUR_per_MWh,Volume_MWh
0 - 1,EUR/MWh,MWh
1 - 2,EUR/MWh,MWh
2 - 3,EUR/MWh,MWh
2 - 3A,EUR/MWh,MWh
3 - 4,EUR/MWh,MWh
4 - 5,EUR/MWh,MWh
5 - 6,EUR/MWh,MWh
6 - 7,EUR/MWh,MWh
7 - 8,EUR/MWh,MWh
8 - 9,EUR/MWh,MWh


In [0]:
display(spark.sql("SELECT * FROM ibex_daily_summary"))

date,time,date_display,price_eur,price,volume
2025-06-05,00:00:00,05.06.2025,120.77,236.21,2330.2
2025-06-06,00:00:00,06.06.2025,103.57,202.57,2571.2
2025-06-07,00:00:00,07.06.2025,103.83,203.07,2284.9
2025-06-08,00:00:00,08.06.2025,96.01,187.78,2217.9
2025-06-09,00:00:00,09.06.2025,99.12,193.86,2201.8
2025-06-10,00:00:00,10.06.2025,89.91,175.85,2730.2
2025-06-11,00:00:00,11.06.2025,81.11,158.64,2249.4
2025-06-05,01:00:00,05.06.2025,108.77,212.74,2560.6
2025-06-06,01:00:00,06.06.2025,120.2,235.09,2810.0
2025-06-07,01:00:00,07.06.2025,99.5,194.61,2192.5


In [0]:
display(spark.sql("SELECT * FROM ibex_peak_offpeak_summary"))

Unnamed__0,Unnamed__1,Unnamed__2,Unnamed__3,Unnamed__4,Unnamed__5,Unnamed__6,Unnamed__7
Base (01-24),,,,,,,
Peak (9-20),,,,,,,
Off-Peak (1-8 & 21-24),,,,,,,


In [0]:
%pip install selenium==4.18.1

Python interpreter will be restarted.
Collecting selenium==4.18.1
  Downloading selenium-4.18.1-py3-none-any.whl (10.0 MB)
Collecting trio-websocket~=0.9
  Downloading trio_websocket-0.12.2-py3-none-any.whl (21 kB)
Collecting typing_extensions>=4.9.0
  Downloading typing_extensions-4.14.0-py3-none-any.whl (43 kB)
Collecting trio~=0.17
  Downloading trio-0.30.0-py3-none-any.whl (499 kB)
Collecting attrs>=23.2.0
  Downloading attrs-25.3.0-py3-none-any.whl (63 kB)
Collecting exceptiongroup
  Downloading exceptiongroup-1.3.0-py3-none-any.whl (16 kB)
Collecting sniffio>=1.3.0
  Downloading sniffio-1.3.1-py3-none-any.whl (10 kB)
Collecting outcome
  Downloading outcome-1.3.0.post0-py2.py3-none-any.whl (10 kB)
Collecting sortedcontainers
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl (29 kB)
Collecting wsproto>=0.14
  Downloading wsproto-1.2.0-py3-none-any.whl (24 kB)
Collecting PySocks!=1.5.7,<2.0,>=1.5.6
  Downloading PySocks-1.7.1-py3-none-any.whl (16 kB)
Collecting h11<1,>=0.9.

In [0]:
%sh 
sudo apt-get update 
sudo apt-get install -y wget unzip 
wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb 
sudo apt-get install -y ./google-chrome-stable_current_amd64.deb 
sudo apt-get install -y -f  # Fix any dependency issues


Hit:1 https://repos.azul.com/zulu/deb stable InRelease
Get:2 http://security.ubuntu.com/ubuntu focal-security InRelease [128 kB]
Hit:3 http://archive.ubuntu.com/ubuntu focal InRelease
Get:4 http://archive.ubuntu.com/ubuntu focal-updates InRelease [128 kB]
Get:5 http://security.ubuntu.com/ubuntu focal-security/universe amd64 Packages [1,308 kB]
Get:6 http://archive.ubuntu.com/ubuntu focal-backports InRelease [128 kB]
Get:7 http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages [4,919 kB]
Get:8 http://security.ubuntu.com/ubuntu focal-security/main amd64 Packages [4,431 kB]
Get:9 http://archive.ubuntu.com/ubuntu focal-updates/universe amd64 Packages [1,599 kB]
Get:10 http://archive.ubuntu.com/ubuntu focal-updates/restricted amd64 Packages [4,998 kB]
Get:11 http://security.ubuntu.com/ubuntu focal-security/restricted amd64 Packages [4,801 kB]
Fetched 22.4 MB in 4s (6,133 kB/s)
Reading package lists...
Reading package lists...
Building dependency tree...
Reading state information

In [0]:
%sh
google-chrome --version

Google Chrome 137.0.7151.103 


In [0]:
%pip install webdriver-manager==4.0.2 typing_extensions==4.12.2

Python interpreter will be restarted.
Collecting webdriver-manager==4.0.2
  Downloading webdriver_manager-4.0.2-py2.py3-none-any.whl (27 kB)
Collecting typing_extensions==4.12.2
  Downloading typing_extensions-4.12.2-py3-none-any.whl (37 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.1.0-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv, webdriver-manager, typing-extensions
  Attempting uninstall: typing-extensions
    Found existing installation: typing-extensions 4.14.0
    Uninstalling typing-extensions-4.14.0:
      Successfully uninstalled typing-extensions-4.14.0
Successfully installed python-dotenv-1.1.0 typing-extensions-4.12.2 webdriver-manager-4.0.2
Python interpreter will be restarted.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import col
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
 
# Initialize Spark session
spark = SparkSession.builder.appName("WebScrapeTablesSelenium").getOrCreate()
 
# Function to clean numeric values
def clean_numeric(value):
    try:
        # Remove spaces and convert to float
        cleaned = value.replace(" ", "").replace(",", ".")
        return float(cleaned) if cleaned else 0.0
    except ValueError:
        return 0.0
 
# Function to scrape and parse table data using Selenium
def scrape_and_parse_table(url, table_index, table_name):
    # Set up Selenium with headless Chrome
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    # Explicitly set Chrome binary path
    chrome_options.binary_location = "/usr/bin/google-chrome"
   
    # Initialize Chrome driver
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
   
    try:
        # Load webpage
        driver.get(url)
        # Wait for JavaScript to render
        driver.implicitly_wait(10)
       
        # Get page source and parse with BeautifulSoup
        soup = BeautifulSoup(driver.page_source, "html.parser")
        tables = soup.find_all("table")
       
        # Debug: Print number of tables found
        print(f"Found {len(tables)} tables for {table_name}")
       
        if table_index >= len(tables):
            print(f"Table index {table_index} not found. Found {len(tables)} tables.")
            return None
       
        table = tables[table_index]
        rows = table.find_all("tr")
        headers = [th.get_text().strip().replace(" ", "_") for th in rows[0].find_all("th")][2:]  # Skip first two columns
       
        data = []
        if table_name == "prices_volumes":
            for row in rows[1:]:
                cells = [td.get_text().strip() for td in row.find_all("td")]
                metric = cells[0]
                values = cells[2:]  # Skip first two columns
                for date, value in zip(headers, values):
                    data.append((metric, date, clean_numeric(value)))
           
            schema = StructType([
                StructField("metric", StringType(), False),
                StructField("date", StringType(), False),
                StructField("value", FloatType(), False)
            ])
            df = spark.createDataFrame(data, schema)
            return df
       
        elif table_name == "block_products":
            for row in rows[1:]:
                cells = [td.get_text().strip() for td in row.find_all("td")]
                product = cells[0]
                values = cells[2:]  # Skip first two columns
                for date, value in zip(headers, values):
                    data.append((product, date, clean_numeric(value)))
           
            schema = StructType([
                StructField("product", StringType(), False),
                StructField("date", StringType(), False),
                StructField("price", FloatType(), False)
            ])
            df = spark.createDataFrame(data, schema)
            return df
       
        elif table_name == "hourly_products":
            current_hour = ""
            for row in rows[1:]:
                cells = [td.get_text().strip() for td in row.find_all("td")]
                if cells[0]:  # New hour
                    current_hour = cells[0]
                metric = cells[1]
                values = cells[2:]  # Skip first two columns
                for date, value in zip(headers, values):
                    data.append((current_hour, metric, date, clean_numeric(value)))
           
            schema = StructType([
                StructField("hour", StringType(), False),
                StructField("metric", StringType(), False),
                StructField("date", StringType(), False),
                StructField("value", FloatType(), False)
            ])
            df = spark.createDataFrame(data, schema)
            return df
   
    finally:
        driver.quit()
 
# URL to scrape
url = "https://ibex.bg/markets/dam/day-ahead-prices-and-volumes-v2-0-2/"
 
# Process and save each table
prices_volumes_df = scrape_and_parse_table(url, 0, "prices_volumes")
if prices_volumes_df:
    prices_volumes_df.write.mode("overwrite").format("delta").saveAsTable("prices_volumes_table")
    display(prices_volumes_df)
 
block_products_df = scrape_and_parse_table(url, 1, "block_products")
if block_products_df:
    block_products_df.write.mode("overwrite").format("delta").saveAsTable("block_products_table")
    display(block_products_df)
 
hourly_products_df = scrape_and_parse_table(url, 2, "hourly_products")
if hourly_products_df:
    hourly_products_df.write.mode("overwrite").format("delta").saveAsTable("hourly_products_table")
    display(hourly_products_df)
 

Found 4 tables for prices_volumes


metric,date,value
Prices (EUR/MWh),"Sat,_06/07",81.38
Prices (EUR/MWh),"Sun,_06/08",63.54
Prices (EUR/MWh),"Mon,_06/09",77.97
Prices (EUR/MWh),"Tue,_06/10",83.58
Prices (EUR/MWh),"Wed,_06/11",83.99
Prices (EUR/MWh),"Thu,_06/12",85.41
Volume (MWh),"Sat,_06/07",65968.3
Volume (MWh),"Sun,_06/08",72070.8
Volume (MWh),"Mon,_06/09",69105.4
Volume (MWh),"Tue,_06/10",69329.2


Found 4 tables for block_products


product,date,price
Base (01-24),"Sat,_06/07",81.38
Base (01-24),"Sun,_06/08",63.54
Base (01-24),"Mon,_06/09",77.97
Base (01-24),"Tue,_06/10",83.58
Base (01-24),"Wed,_06/11",83.99
Base (01-24),"Thu,_06/12",85.41
Peak (9-20),"Sat,_06/07",48.62
Peak (9-20),"Sun,_06/08",34.44
Peak (9-20),"Mon,_06/09",43.5
Peak (9-20),"Tue,_06/10",69.62


Found 4 tables for hourly_products


hour,metric,date,value
0 - 1,EUR/MWh,"Fri,_06/06",103.57
0 - 1,EUR/MWh,"Sat,_06/07",103.83
0 - 1,EUR/MWh,"Sun,_06/08",96.01
0 - 1,EUR/MWh,"Mon,_06/09",99.12
0 - 1,EUR/MWh,"Tue,_06/10",89.91
0 - 1,EUR/MWh,"Wed,_06/11",81.11
0 - 1,EUR/MWh,"Thu,_06/12",107.36
MWh,2571.2,"Fri,_06/06",2284.9
MWh,2571.2,"Sat,_06/07",2217.9
MWh,2571.2,"Sun,_06/08",2201.8
