In [None]:
# Importing necessary files and packages
import json
import os
import re
from datetime import date, datetime
from pathlib import Path
from typing import List, Union

import numpy as np
from dotenv import load_dotenv

from StockETL import DateTimeUtil

In [2]:
# Load environment variables from a .env file
load_dotenv()


USERNAME = str(os.getenv("USERNAME")).lower()
print(f"{USERNAME = }")

USERNAME = 'ptprashanttripathi'


In [7]:
# Function to Extracts the year and month from filename


def extract_year_month(file_name):
    """
    Extracts the year and month from a given filename and returns a date object.

    Parameters:
    file_name (str): The filename from which to extract the year and month.

    Returns:
    datetime.date: The extracted date.
    """
    # extracting just the base filename
    # file_name = str(os.path.basename(file_name))

    # Clean and normalize the filename string
    file_date_str = re.sub(r"[^A-Za-z0-9]+", " ", file_name).lower()

    # Extract year
    year_match = re.search(r"20\d{2}", file_date_str)
    if year_match:
        year = year_match.group(0)
    else:
        raise ValueError("Year not found in filename")

    # Define a mapping from month abbreviations to full month names
    month_mapping = {
        "jan": "01",
        "feb": "02",
        "mar": "03",
        "apr": "04",
        "may": "05",
        "jun": "06",
        "jul": "07",
        "aug": "08",
        "sep": "09",
        "sept": "09",
        "oct": "10",
        "nov": "11",
        "dec": "12",
        "january": "01",
        "february": "02",
        "march": "03",
        "april": "04",
        "june": "06",
        "july": "07",
        "august": "08",
        "september": "09",
        "october": "10",
        "november": "11",
        "december": "12",
    }

    # Extract month
    month = None
    for key, value in month_mapping.items():
        if key in file_date_str:
            month = value
            break

    if not month:
        raise ValueError("Month not found in filename")

    # Combine year and month to form a date string and convert to date object
    date_str = f"{year}-{month}-01"
    return datetime.strptime(date_str, "%Y-%m-%d").date()

In [9]:
# function to calc the first date of the given week and year
# Define a Python function to compute the first date of the given week and year


def get_first_date_of_week(year, week):
    first_day_of_year = datetime.date(year, 1, 1)
    if first_day_of_year.weekday() > 3:
        first_day_of_year = first_day_of_year + datetime.timedelta(
            7 - first_day_of_year.weekday()
        )
    else:
        first_day_of_year = first_day_of_year - datetime.timedelta(
            first_day_of_year.weekday()
        )
    return first_day_of_year + datetime.timedelta(weeks=week - 1)

In [10]:
# function to extract year and week number


def extract_year_week(file_path):
    pattern = re.compile(r"(\d{4})week(\d{1,2})")
    match = pattern.search(file_path.lower().replace(" ", ""))
    if match:
        year, week = match.groups()
        return int(year), int(week)
    else:
        raise Exception("Year and Week number not found in file_path")

In [13]:
# DataFrame to Schema


def get_correct_datatype(input_datatype):
    input_datatype = str(input_datatype).lower().strip()
    datatypes_list = {
        "Date": ["date"],
        "string": ["string", "varchar", "char", "text", "object"],
        "Long": ["bigint", "int", "tinyint", "long"],
        "Timestamp": ["timestamp", "datetime"],
        "Double": ["double", "float", "decimal"],
        "Boolean": ["bool", "boolean"],
    }
    for datatype_name, datatype_values in datatypes_list.items():
        if input_datatype in datatype_values:
            return datatype_name
    print(f"undefined data type => {input_datatype}")
    return input_datatype

In [14]:
# DataType	Description
# date	Store Date Only
# datetime	Store Date and Time
# string	String or character
# int	Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
# long	Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807
# bit	bit
# timestamp	date with time detail in ISOformat
# double	Represents 8-byte double-precision floating point numbers
# float	Represents 4-byte single-precision floating point numbers.
# boolean	True or False

In [15]:
def create_data_contract(df, schema_path):
    # Create the list of dictionaries
    data_schema = [
        {
            "col_name": col,
            "data_type": str(dtype),  # get_correct_datatype(str(dtype)),
        }
        for col, dtype in df.dtypes.to_dict().items()
    ]
    # Write the dictionary to a JSON file
    with open(schema_path, "w") as json_file:
        json.dump({"data_schema": data_schema}, json_file, indent=4)

    print(f"Dictionary written to {schema_path}")

In [17]:
def get_financial_year(date: Union[datetime, date]) -> str:
    """
    Calculate the financial year for a given date.

    If the month of the provided date is before April (i.e., January, February, or March),
    the date is considered to be part of the previous financial year. Otherwise, it belongs
    to the current financial year.

    Args:
    - date (Union[datetime, date]): The date for which to calculate the financial year.

    Returns:
    - str: The financial year in the format 'FYYYYY-YY'.
    """
    # Determine the start and end years of the financial year
    start_year = date.year - 1 if date.month < 4 else date.year
    end_year = start_year + 1

    # Format the financial year as 'FYYYYY-YY'
    return f"FY{start_year}-{str(end_year)[-2:]}"

In [18]:
def generate_date_list(start_date, end_date):
    """
    Generates a list of DateTimeUtil objects representing the first day of each month
    within the specified date range.

    Args:
        start_date (datetime.date): Start date of the range.
        end_date (datetime.date): End date of the range.

    Returns:
        List[DateTimeUtil]: List of DateTimeUtil objects for each month within the range.
    """
    month_list = []
    current_date = min(start_date, DateTimeUtil.today())
    end_date = min(end_date, DateTimeUtil.today())
    while current_date <= end_date:
        month_list.append(DateTimeUtil(current_date.year, current_date.month, 1))
        if current_date.month == 12:
            current_date = current_date.replace(
                year=current_date.year + 1, month=1, day=1
            )
        else:
            current_date = current_date.replace(month=current_date.month + 1, day=1)
    return month_list

In [19]:
# Replace NaN values with empty strings in a DataFrame


def replace_nan_with_empty(data):
    if isinstance(data, dict):
        return {key: replace_nan_with_empty(value) for key, value in data.items()}
    elif isinstance(data, list):
        return [replace_nan_with_empty(item) for item in data]
    elif isinstance(data, float) and np.isnan(data):
        return ""
    return data