In [None]:
import asyncio
import aiohttp
import re
import os
import hashlib
import pytz  # To manage time zones
from datetime import datetime
from typing import List, Tuple, Union, Optional
import polars as pl
from io import BytesIO
from azure.storage.blob import BlobServiceClient
from dotenv import load_dotenv

load_dotenv()

In [2]:
EPL_TEAM = { 
    'AFC Bournemouth' : 'afc-bournemouth',
    'Arsenal' : 'arsenal',
    'Aston Villa' : 'aston-villa',
    'Brentford' : 'brentford',
    'Brighton & Hove Albion' : 'brighton-and-hove-albion',
    'Chelsea' : 'chelsea',
    'Crystal Palace' : 'crystal-palace',
    'Everton' : 'everton',
    'Fulham' : 'fulham',
    'Ipswich Town' : 'ipswich-town',
    'Leicester City' : 'leicester-city',
    'Liverpool' : 'liverpool',
    'Manchester City' : 'manchester-city',
    'Manchester United' : 'manchester-united',
    'Newcastle United' : 'newcastle-united',
    'Nottingham Forest' : 'nottingham-forest',
    'Southampton' : 'southampton',
    'Tottenham Hotspur' : 'tottenham-hotspur',
    'West Ham United' : 'west-ham-united',
    'Wolverhampton Wanderers' : 'wolverhampton-wanderers'
}  

PAGES_NUMBER = [1, 2]

BASE_URL = "https://www.bbc.com/sport/football/teams"

In [3]:
def create_blob_client_with_connection_string(connection_string: str) -> BlobServiceClient:
    """
    Creates a BlobServiceClient using a connection string, handling URL-encoded characters.
    
    :param connection_string: The Azure Storage connection string
    :return: BlobServiceClient object
    """
    connection_string = re.sub(r'%2B', '+', connection_string)
    return BlobServiceClient.from_connection_string(connection_string)


def write_blob_to_container(df: pl.DataFrame, container_name: str, path_to_blob: str, blob_service_client: BlobServiceClient) -> None:
    """
    Writes a Polars DataFrame as a Parquet file to an Azure Blob Storage container.
    
    :param df: Polars DataFrame to write
    :param container_name: Name of the Azure Blob Storage container
    :param path_to_blob: Path to the blob in the container
    :param blob_service_client: BlobServiceClient object for Azure Blob Storage
    """
    parquet_buffer = from_polars_to_parquet(df)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=path_to_blob)
    try:
        blob_client.upload_blob(parquet_buffer.getvalue(), blob_type="BlockBlob", overwrite=True)
        print(f"Successfully uploaded blob to {container_name}/{path_to_blob}")
    except Exception as e:
        print(f"Error uploading blob to {container_name}/{path_to_blob}: {e}")


def read_blob_from_container(container_name: str, path_to_blob: str, blob_service_client: BlobServiceClient) -> Union[pl.DataFrame, None]:
    """
    Reads a Parquet file from an Azure Blob Storage container and returns it as a Polars DataFrame.
    
    :param container_name: Name of the Azure Blob Storage container
    :param path_to_blob: Path to the blob in the container
    :param blob_service_client: BlobServiceClient object for Azure Blob Storage
    :return: Polars DataFrame read from the blob, or None if the operation fails
    """
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=path_to_blob)
    try:
        download_stream = blob_client.download_blob()
        blob_data = download_stream.readall()
        df = pl.read_parquet(BytesIO(blob_data))
        print(f"Successfully read blob from {container_name}/{path_to_blob}")
        return df
    except Exception as e:
        print(f"Error reading blob from {container_name}/{path_to_blob}: {e}")
        return None
    

def from_polars_to_parquet(df: pl.DataFrame) -> BytesIO:
    """
    Converts a Polars DataFrame to a Parquet file in memory.
    
    :param df: Polars DataFrame to be converted to Parquet
    :return: BytesIO buffer containing the Parquet file
    """
    parquet_buffer = BytesIO()
    df.write_parquet(parquet_buffer, use_pyarrow=True)
    parquet_buffer.seek(0)  # Reset buffer position to the beginning
    return parquet_buffer


def merge_dataframes_on_hashedId(df1: pl.DataFrame, df2: pl.DataFrame) -> pl.DataFrame:
    """
    Merges two Polars DataFrames based on the "_hashedId" column.
    If an "_hashedId" from df1 exists in df2, it will not be added.
    
    :param df1: First Polars DataFrame
    :param df2: Second Polars DataFrame
    :return: Merged Polars DataFrame with no duplicate "_hashedId" records from df1
    """
    try:
        # Perform an anti-join to find records in df1 that do not have a match in df2 based on "_hashedId"
        df_filtered = df1.join(df2, on="_hashedId", how="anti")
        
        # Log the result of the anti-join operation
        if df_filtered.is_empty():
            print("No new records to merge.")
        else:
            print(f"{df_filtered.shape[0]} new records found. Merging them.")

        # Concatenate the filtered rows from df1 with df2
        df_merged = pl.concat([df2, df_filtered], how="vertical")
        return df_merged

    except Exception as e:
        print(f"Error occurred while merging dataframes: {e}")
        raise


def get_current_datetime(timezone: str = 'UTC') -> str:
    """
    Returns the current date and time in the specified timezone.
    
    :param timezone: Timezone name, default is 'UTC'
    :return: Current date and time as a formatted string
    """
    current_datetime = datetime.now(pytz.timezone(timezone))
    return current_datetime.strftime('%Y-%m-%d %H:%M:%S')


def get_teams_url(epl_teams, number_of_pages) -> List[List[Union[str, int, str]]]:
    """
    Generates a list of URLs for the given teams and the specified number of pages.
    
    :param epl_teams: A dictionary of team names and their corresponding values
    :param number_of_pages: The number of pages to scrape for each team
    :return: A list of lists containing team name, page number, and URL
    """
    # Dictionary to hold the URLs
    team_urls = []

    for team_name, value in epl_teams.items():
        for page_number in range(1, number_of_pages + 1):
            # Create a list of URLs for each team
            url = f"{BASE_URL}/{value}/?page={page_number}"
            team_urls.append([team_name, page_number, url])
    
    return team_urls


def generate_hash(content: str) -> str:
    """
    Generates a SHA-256 hash from the given content string.
    
    :param content: String to hash
    :return: SHA-256 hash of the input content
    """
    try:
        return hashlib.sha256(content.encode('utf-8')).hexdigest()
    except Exception as e:
        print(f"Error generating hash: {e}")
        raise


def create_blob_name(timestamp: str) -> str:
    """
    Creates a blob name in the format 'epl_news_YYYY_MM_DD' based on the given timestamp string.
    
    :param timestamp: A string representing the date and time (e.g., '2024-10-03 10:46:15')
    :return: A formatted string in the format 'epl_news_YYYY_MM_DD'
    """
    # Extract the date part from the timestamp
    date_part = timestamp.split(' ')[0]
    
    # Replace the hyphens with underscores
    formatted_date = date_part.replace('-', '_')
    
    # Return the final formatted blob name
    return f"epl_news_{formatted_date}"


def create_dataframe(input_df: List[List[Union[str, int, str]]], datetime_now: str) -> pl.DataFrame:
    """
    Creates a Polars DataFrame with the specified schema, adds the current datetime to each row,
    and generates a hash of the HTML content.
    
    :param input_df: Input data as a list of lists
    :param datetime_now: Current datetime as a string to add to the "_extractedDate" column
    :return: A Polars DataFrame with the specified schema and new columns
    """
    # Define the schema explicitly
    schema = pl.Schema({
        "teamName": pl.String(),
         "page": pl.Int8,
         "html": pl.String()
         })

    df = pl.from_records(
        input_df,
        schema = schema,
        orient = 'row'
        )
    
    # Create the "_extractedDate" column using the provided datetime
    date_list = [datetime_now for i in range(len(input_df))]
    date_series = pl.Series("_extractedDate", date_list)
    datetime_series = date_series.str.strptime(pl.Datetime, format='%Y-%m-%d %H:%M:%S')
    df = df.with_columns(
        pl.Series("_extractedDate", datetime_series)
        )

    # Add a new "_hashedId" column by applying the generate_hash function to the "html" column
    df = df.with_columns(
        pl.col("html").map_elements(lambda x: generate_hash(x), return_dtype=pl.Utf8).alias("_hashedId")
        )
    
    # Rearrange the columns
    return df.select(["_hashedId", "_extractedDate", "teamName", "page", "html"])


async def get_page(team_name: str, page_number: int, url: str, session: aiohttp.ClientSession) -> List[str]:
    """
    Fetches the HTML content of a webpage asynchronously with retries in case of failure.
    
    :param team_name: Name of the team
    :param page_number: Page number to fetch
    :param url: The URL to request
    :param session: An aiohttp ClientSession object for making requests
    :return: A list containing the team name, page number, and either the HTML content or an error message
    """
    retries = 3
    backoff_factor = 2
    delay = 1

    for attempt in range(retries):
        try:
            async with session.get(url) as response:
                if 200 <= response.status < 300:
                    print(f"Success: {team_name} Page {page_number}")
                    return [team_name, page_number, await response.text()]
                elif response.status == 429: # too many requests code
                        print(f"Error {response.status} for {url}, retrying for id {id}...")
                        raise aiohttp.ClientError(f"HTTP error 429 for {url}")
                else:
                    print(f"Failed with status {response.status} for {team_name}, page {page_number}")
                    return [team_name, page_number, f"HTTP error {response.status}"]
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            print(f"Error {e}, retrying in {delay} seconds for {team_name}, page {page_number} (Attempt {attempt + 1}/{retries})")
            await asyncio.sleep(delay)
            delay *= backoff_factor  # Increase the delay exponentially

        # After max retries, raise the exception
        if attempt == retries - 1:
            print(f"Failed after {retries} attempts for {team_name}, page {page_number}")
            return [team_name, page_number, f"Failed after {retries} attempts"]


async def get_all_pages(teams_details: List[Tuple[str, int, str]], session: aiohttp.ClientSession) -> List[List[str]]:
    """
    Creates asynchronous tasks to fetch pages for all teams and collects the results.
    
    :param teams_details: A list of tuples where each tuple contains (team_name, page_number, url)
    :param session: An aiohttp ClientSession object for making requests
    :return: A list of lists containing the team name, page number, and page content or error messages
    """
    tasks = [asyncio.create_task(get_page(team_name, page_number, url, session))
             for team_name, page_number, url in teams_details]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    return results


async def scrapper(urls: List[Tuple[str, int, str]]) -> List[List[str]]:
    """
    Main asynchronous entry point that sets up the aiohttp session and fetches pages for all URLs.
    
    :param urls: A list of tuples where each tuple contains (team_name, page_number, url)
    :return: A list of lists containing the team name, page number, and page content or error messages
    """
    async with aiohttp.ClientSession() as session:
        data = await get_all_pages(urls, session)
    return data

In [None]:
# Get the current datetime
datetime_now = get_current_datetime()

# Get the list of team URLs to scrape
team_urls = get_teams_url(EPL_TEAM, 2)

# Scrape the data from the URLs asynchronously
results = await scrapper(team_urls)

# Create a new Polars DataFrame from the scraped results
df_new = create_dataframe(results, datetime_now)

# Load environment variables
connection_string = os.environ.get("CONN_STRING_AZURE_STORAGE")
if connection_string is None:
    raise EnvironmentError("Azure storage connection string not found in environment variables.")

# Create a blob client for Azure Blob Storage
blob_service_client = create_blob_client_with_connection_string(connection_string)

# Define the container and path for the blob storage
container_name = "bronze"
folder_name = "epl_news"
blob_name = create_blob_name(datetime_now)
path = f"{folder_name}/{blob_name}.parquet"

# Read the existing blob data from Azure Blob Storage, if available
df_actual: Optional[pl.DataFrame] = read_blob_from_container(container_name, path, blob_service_client)

if df_actual is None:
    # If no existing data, write the new DataFrame directly to the blob
    print("No existing data found, writing new data to blob...")
    write_blob_to_container(df_new, container_name, path, blob_service_client)
else:
    # If existing data is found, merge it with the new data
    print("Existing data found, merging with new data...")
    df_merged = merge_dataframes_on_hashedId(df_actual, df_new)

    # Write the merged DataFrame back to the blob
    write_blob_to_container(df_merged, container_name, path, blob_service_client)
        
print("Operation completed successfully.")