In [None]:
pip install dotenv
pip install minio

In [1]:
# These are dummy. They were changed long before this notebook got near the internet, so they are safe to show. 
credentials = {
        "key": "PKCTYL4MO5SA2QEIZL2TDEJRTA",
        "secret_key": "DKL2eLVYFcxKzMJtDYbuG3zppWjpwHzsTS1DtHVwc9Cz" 
    }

## Utilities

In [2]:
import json
import logging
import re
import requests
import time

from datetime import datetime, timedelta, timezone, UTC
from pathlib import Path
from typing import Union, Tuple, Optional
from urllib.parse import urlencode

In [52]:
# Used in:
    #get_most_active_stocks
    #get_top_movers
headers = {
    "accept": "application/json",
    "APCA-API-KEY-ID": credentials["key"],
    "APCA-API-SECRET-KEY": credentials["secret_key"]
}

def get_tickers() -> None:
    # get S&P 500 tickers
    # no arguments, returns nothing, creates json list with tickers
    from bs4 import BeautifulSoup
    
    response = requests.get('https://stockanalysis.com/list/sp-500-stocks/')
    soup = BeautifulSoup(response.content, 'html.parser')
    
    # Find all <td> elements with the specific class
    td_elements = soup.find_all('td', class_='sym svelte-1ro3niy')
    
    # Extract text from <a> tags inside those <td> elements
    tickers = []
    for td in td_elements:
        a_tag = td.find('a')
        if a_tag and a_tag.text.strip():
            tickers.append(a_tag.text.strip())
            
    # Write those into json file
    with open("tickers.json", "w") as json_file:
        json.dump(tickers, json_file)


# get top active stocks
def get_most_active_stocks(by: str = "volume", top: int = 100) -> str:
    """Calls API and returns pretty formated json string of most active stocks at the moment of last update. 
    Args:
        by: either 'volume' or 'trades'
        top: a number between 1 and 100"""
    
    url = f"https://data.alpaca.markets/v1beta1/screener/stocks/most-actives?by={by}&top={top}"
    response = requests.get(url, headers=headers)
    
    return response.text


# get top market movers
def get_top_movers(top: int = 100) -> str:
    """Calls API and returns pretty formated json string of top market movers, both gainers and losers, at the moment of last update. 
    Unfortunately, it doesn't filter out penny stocks, so we can't look at stocks worth, say, $5 and more.
    Args:
        top: a number between 1 and 50. How many of each gainers and losers to return."""
    
    url = f"https://data.alpaca.markets/v1beta1/screener/stocks/movers?top={top}"
    response = requests.get(url, headers=headers)
    
    return response.text

# Used in validate_arguments
def validate_time_frame(time_frame: str) -> Union[bool, str]:
    """
    Validate if time_frame satisfies the aggregation format:
    - [1-59]Min or [1-59]T for minutes
    - [1-23]Hour or [1-23]H for hours  
    - 1Day or 1D for days
    - 1Week or 1W for weeks
    - [1,2,3,4,6,12]Month or [1,2,3,4,6,12]M for months
    """
    # ^(\d+) - Capture digits at start
    # (Min|T|Hour|H|Day|D|Week|W|Month|M)$ - Specific suffixes
    match = re.match(r'^(\d+)(Min|T|Hour|H|Day|D|Week|W|Month|M)$', str(time_frame))
    
    if not match:
        return False, "Invalid format. Must be: [number][unit]"
    
    value = int(match.group(1))
    unit = match.group(2)
    
    # Validate based on unit
    if unit in ['Min', 'T']:
        if not (1 <= value <= 59):
            return False, "Minutes must be between 1-59"
            
    elif unit in ['Hour', 'H']:
        if not (1 <= value <= 23):
            return False, "Hours must be between 1-23"
            
    elif unit in ['Day', 'D']:
        if value != 1:
            return False, "Days must be exactly 1"
            
    elif unit in ['Week', 'W']:
        if value != 1:
            return False, "Weeks must be exactly 1"
            
    elif unit in ['Month', 'M']:
        valid_months = [1, 2, 3, 4, 6, 12]
        if value not in valid_months:
            return False, f"Months must be one of {valid_months}"
    
    return True, None

# Used in validate_arguments
def validate_datetime(date_string: str) -> Union[str, str]:
    logging.basicConfig(filename='datetime.log', level=logging.ERROR)
    
    if not isinstance(date_string, str):
        logging.info((f"date_string is of type '{type(date_string)}'"))
        return None, "Input must be a string"
    
    # Pattern for YYYY-MM-DD
    yyyy_mm_dd_pattern = r'^\d{4}-\d{2}-\d{2}$'
    # Pattern for YYYY-MM-DDThh:mm:ss (with optional timezone)
    datetime_pattern = r'^\d{4}-\d{2}-\d{2}[Tt ]\d{2}:\d{2}:\d{2}'
    
    date_string = date_string.strip()
    try:
        if re.match(yyyy_mm_dd_pattern, date_string):
            logging.info(("yyyy_mm_dd matched"))
            date_string = str(datetime.strptime(date_string, "%Y-%m-%d").isoformat())
            logging.info((f"Before: {date_string}\nAfter: {date_string}\n-------------------------------"))
            return date_string, None
        elif re.match(datetime_pattern, date_string):
            logging.info(("datetime matched"))
            # Replace space with T
            if ' ' in date_string:
                date_string = date_string.replace(' ', 'T')
            # Handle different timezone cases
            if date_string.upper().endswith('Z'):
                # Already has Z timezone
                date_string = str(datetime.fromisoformat(date_string.replace('Z', '+00:00')).isoformat())
                logging.info((f"Before: {date_string}\nAfter: {date_string}\n-------------------------------"))
                return date_string, None
            elif '+' in date_string or '-' in date_string[10:]:
                # Has timezone offset
                date_string = str(datetime.fromisoformat(date_string).isoformat())
                logging.info((f"Before: {date_string}\nAfter: {date_string}\n-------------------------------"))
                return date_string, None
            else:
                date_string = date_string[:19]
                date_string = str(datetime.fromisoformat(date_string).replace(tzinfo=timezone.utc).isoformat())
                logging.info((f"Before: {date_string}\nAfter: {date_string}\n-------------------------------"))
                return date_string, None
        else:
           return date_string, ("Unsupported date format. Use YYYY-MM-DD, YYYY-MM-DDThh:mm:ss or ime expressed in RFC-3339 format")
            
    except ValueError as e:
        return date_string, (f"Invalid date: {str(e)}")
    except Exception as e:
        return date_string, (f"Error processing date: {str(e)}")


# Used in get_historical_bars
def validate_arguments(
    tickers_to_search: list[str],
    date_start: str,
    date_end: str,
    time_frame: str = "1D",
    limit: int = 10000,) -> Tuple[bool, list[str]]:
    """ 
        This function validate arguments before passing those into API call.
        Args:
            tickers_to_search:
                non-empty list of stock tickers.
            time_frame:
                [1-59]Min or [1-59]T, e.g. 5Min or 5T creates 5-minute aggregations.
                [1-23]Hour or [1-23]H, e.g. 12Hour or 12H creates 12-hour aggregations.
                1Day or 1D creates 1-day aggregations.
                1Week or 1W creates 1-week aggregations.
                [1,2,3,4,6,12]Month or [1,2,3,4,6,12]M, e.g. 3Month or 3M creates 3-month aggregations.
            date_start and date-end:
                string in YYYY-MM-DD or rfc-3339 format. date_start should be an earlier date than date_end.
            limit: 
                number between 1 and 10000.
        Returns:
            Tuple of (is_valid, error_messages)
                """
        
    error_messages = []
    # Validate tickers list
    if len(tickers_to_search) < 1:
        error_messages.append("'tickers_to_search' must be a non-emplty list of tickers")
        
    # Validate limit
    if not (1 <= limit <= 10000):
        error_messages.append(f"'limit' must be between 1 and 10000. Your input: {limit}")
        
    # Validate timeframe
    is_time_frame_valid, error_message = validate_time_frame(time_frame)
    if not is_time_frame_valid:
        error_messages.append(f"{error_message}. Your input: {time_frame}")
        
    # Validate dates
    dates_legit = True
    for date in [date_start, date_end]:
        date, error_message = validate_datetime(date)
        if error_message is not None:
            error_messages.append(f"{error_message}. Your input: {date}")
            dates_legit = False
            
    # If dates are valid, check if order is valid, too. 
    if dates_legit == True:
        if datetime.fromisoformat(date_start) >= datetime.fromisoformat(date_end):
            error_messages.append(f"date_start should be an earlier date than date_end. Your date_start: {date_start} and date_end: {date_end}")

    return len(error_messages) == 0, error_messages


# Used in get_historical_bars
def write_into_json_file(data: str,
                         dates: Tuple[str, str],
                         i: int = 0,
                         path: Path = Path.cwd()) -> None:
    first_ticker = list(data['bars'].keys())[0]
    first_date = data['bars'][first_ticker][0]['t']
    last_ticker = list(data['bars'].keys())[-1]
    last_date = data['bars'][first_ticker][-1]['t']
    file_name = f"{first_ticker}_{first_date}-{last_ticker}_{last_date}.json"
    path = Path.cwd().parent / "data" / file_name
    with open(path, "w") as json_file:
        json.dump(data, json_file)


def get_historical_bars(
    tickers_to_search: list[str],
    time_frame: str = "1D",
    date_start: str = None,
    date_end: str = None,
    limit: int = 10000,) -> bool:
    """ 
        Saves json in file or prints errors made in arguments.
        Args:
            tickers_to_search:
                non-empty list of stock tickers.
            time_frame:
                [1-59]Min or [1-59]T, e.g. 5Min or 5T creates 5-minute aggregations.
                [1-23]Hour or [1-23]H, e.g. 12Hour or 12H creates 12-hour aggregations.
                1Day or 1D creates 1-day aggregations.
                1Week or 1W creates 1-week aggregations.
                [1,2,3,4,6,12]Month or [1,2,3,4,6,12]M, e.g. 3Month or 3M creates 3-month aggregations.
            date_start and date-end:
                string in YYYY-MM-DD or rfc-3339 format. date_start should be an earlier date than date_end.
            limit: 
                number between 1 and 10000.
        Returns:
            bool. 
                """
        
    # Set defaults properly (evaluated at call time)
    if date_start is None:
        date_start = (datetime.now().date() - timedelta(days=1)).isoformat()
    if date_end is None:
        date_end = datetime.now().date().isoformat()
        
    # Validate arguments
    is_valid, error_messages = validate_arguments(tickers_to_search, date_start, date_end, time_frame, limit)
    if not is_valid:
        for error in error_messages:
            print(error)
        return False
            
    params = {
    'symbols': ",".join(tickers_to_search),
    'timeframe': time_frame, 
    'start': date_start,
    'end': date_end,
    'limit': limit,
    'adjustment': 'raw',
    'feed': 'sip', 
    'page_token': "",
    'sort': "asc"
    }
    while params['page_token'] is not None: # while there is next page
        url = f"https://data.alpaca.markets/v2/stocks/bars?{urlencode(params)}"
        response = requests.get(url, headers=headers)
        parsed = json.loads(response.text)
        write_into_json_file(parsed, (date_start, date_end))
        params['page_token'] = parsed['next_page_token']
        time.sleep(7)
    return True

In [6]:
date_strings = [
        # Valid YYYY-MM-DD
        "2023-12-25",
        "2000-01-01",
        "1999-12-31",
        
        # Valid RFC-3339
        "2023-12-25T10:30:45Z",
        "2023-12-25T10:30:45+00:00",
        "2023-12-25T10:30:45-05:00",
        "2023-12-25T10:30:45.123Z",
        "2023-12-25T10:30:45.123456+02:00",
        "2025-11-28T09:48:08.075452", # Missing timezone
        "2023-12-25T10:30:45",  # Missing timezone
        
        # Invalid cases
        "2023-13-25",  # Invalid month
        "2023-12-32",  # Invalid day
        "2023-13-35", # Both day and month are invalid
        "2023/12/25",  # Wrong separator
        "25-12-2023",  # Wrong order
        "2023-12-25T25:30:45Z",  # Invalid hour
        "not-a-date",
        "2023-12-25T10:30:45+5:00",  # Timezone without leading zero
    ]


## airflow

In [3]:
import io
import os

from minio import Minio
from minio.error import S3Error
from dotenv import load_dotenv


In [106]:
def get_minio_credentials(path_to_env_file: Union[str, Path] = "../config/minio.env") -> str:
    """This function returns login and password for root user of minio server, getting those from 'minio.env' file.
    'minio.env' should have MINIO_ROOT_USER and MINIO_ROOT_PASSWORD variables. If there are no such variables, asks user to provide those via input.
    Args:
        path_to_env_file: either string or pathlib.Path object leading to minio.env file."""
    load_dotenv(path_to_env_file)
    MINIO_ROOT_USER = os.getenv("MINIO_ROOT_USER")
    MINIO_ROOT_PASSWORD = os.getenv("MINIO_ROOT_PASSWORD")
    if (MINIO_ROOT_USER and MINIO_ROOT_PASSWORD):
        return MINIO_ROOT_USER, MINIO_ROOT_PASSWORD 
    else:
        print(f"There are no MINIO_ROOT_USER and/or MINIO_ROOT_PASSWORD variables in {path_to_env_file}")

login, password = get_minio_credentials()
print(login, password)

admin admin_password


In [110]:
# Create a client
# args: login, password
host = "localhost"
client = Minio(
    endpoint=f"{host}:9000",
    access_key=login,
    secret_key=password,
    secure=False
    )

In [111]:
#create a bucket
# args: 
    #client:Minio
bucket_name = "bucketass"

def create_bucket(bucket_name: str, client: Minio) -> None:
    found = client.bucket_exists(bucket_name=bucket_name)
    if not found:
        client.make_bucket(bucket_name=bucket_name)
        print("Created bucket", bucket_name)
    else:
        print("Bucket", bucket_name, "already exists")

In [112]:
create_bucket(bucket_name=bucket_name, client=client)

Created bucket bucketass


In [120]:

source_file = "test.txt" # str or path
#make two fput and put for string and stream uploads
def upload_to_bucket(source_file: Union[str, Path, bytes], bucket_name: str, client: Minio, destination_file: str=None) -> None:
    # Make the bucket if it doesn't exist.
    create_bucket(bucket_name, client)
    # Check if name of file has to be changed before upload:
    if destination_file is None:
        destination_file = Path(source_file).name
    # upload file
    client.fput_object(
        bucket_name=bucket_name,
        object_name=destination_file,
        file_path=source_file
    )
    print(
        "successfully uploaded object", destination_file, "to bucket", bucket_name,
    )

In [121]:
upload_to_bucket(source_file, bucket_name, client)

Bucket bucketass already exists
successfully uploaded object test.txt to bucket bucketass


In [None]:
#tutor 
import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator, PythonOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG

def simple_print():
    print("It is a simple print")
    
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(seconds=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(seconds=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]


In [25]:
date_start = datetime.now().date() - timedelta(days=41)
date_end = datetime.now() - timedelta(days=31)
date_start_str = date_start.isoformat()
date_end_str = date_end.isoformat()
date_start, date_end, date_start_str, date_end_str

(datetime.date(2025, 10, 22),
 datetime.datetime(2025, 11, 1, 12, 32, 10, 970827),
 '2025-10-22',
 '2025-11-01T12:32:10.970827')

In [28]:
tickers_to_search = tickers[:2]
tickers_to_search

['NVDA', 'AAPL']

In [57]:
# Since we can only get 15 minutes delayed data, we lag date_end 15 minutes back from now if date_end is today.
if date_end.date() == datetime.now().date():
    date_end = datetime.now().replace(microsecond=0) - timedelta(minutes=15) 

# We have rate limit for 200 api calls per minute and 10000 records mer call. So we can only gather 11 days of historical data in one minute,
# which allows us to make one call every 6-7 seconds to not exceed rate limit.
date_start = date_start - timedelta(days=1) # offset for while loop to not step over date_end
while date_start < date_end.date():
    date_start = date_start + timedelta(days=1)
    #if we stepped over date_end, clamp both dates within the day of date_end
    if date_start > date_end.date(): 
        date_start_str = date_end.date().isoformat()
        date_end_str = date_end.isoformat()
    # if we did not, then gather 11 days from date_start
    else:
        date_start_str = date_start.isoformat()
        date_end_str = (date_start + timedelta(days=11)).isoformat()
        
    get_historical_bars(tickers_to_search, date_start=date_start_str, date_end=date_end_str, time_frame="1Min")

In [15]:
datetime.now().date() - timedelta(days=5*365)

datetime.date(2020, 12, 3)

In [7]:
tickers_to_search = ["NVDA", "AAPL"]
time_frame = "1Min"
date_start = "2025-11-10"
date_end = "2025-11-25"
limit = 10000

params = {
    'symbols': ",".join(tickers_to_search),
    'timeframe': time_frame, 
    'start': date_start,
    'end': date_end,
    'limit': limit,
    'adjustment': 'raw',
    'feed': 'sip', 
    'page_token': "",
    'sort': "asc"
    }
i = 0
    #while params['page_token'] is not None: # while there is next page
url = f"https://data.alpaca.markets/v2/stocks/bars?{urlencode(params)}"
response = requests.get(url, headers=headers)
parsed = json.loads(response.text)
params['page_token'] = parsed['next_page_token']
i += 1

In [54]:
with open('tickers.json', "r") as tickers_file:
    tickers = json.load(tickers_file)
tickers[0]

'NVDA'

In [None]:
date_start = datetime.now(UTC).replace(microsecond=0, second=0, minute=0, hour=0) - timedelta(days=365*5, hours=5)
date_end = datetime.now(UTC).replace(microsecond=0) - timedelta(hours=5, minutes=15)
get_historical_bars(tickers_to_search = tickers_to_search,
time_frame = "1Min",
date_start = date_start.isoformat(),
date_end = date_end.isoformat(),
limit = 10000)

In [80]:
tickers_to_search = tickers.copy()
tickers_to_search.remove('AAPL')
tickers_to_search.remove('NVDA')

In [82]:
len(tickers_to_search)

501