# TIME SERIES AUTOMATION PROJECT

## Busisness Problem:

 - Forecasting **S&P 500** prices with Modeltime and Prefect

## Task:
 - Build an automated trading system

## Steps:
 1. Get bitcoin prices on a 5-minute interval
 2. Store prices on a CSV file
 3. Forecasting the movement
 4. Store forecasted prices on a CSV file on every 60 seconds.


# ETL process of **S&P 500** prices data

# GOALS
- Add `prefect` and examine default cli logging

### RUN COMMAND

Run this command `python path/ETL_flow_with_prefect.py` on command line.*italicized text*

## GOALS
- Make a Deployment YAML file
- Expose Scheduling to run the flow on an "Internet Schedule"
- **IMPORTANT**: Interval Scheduler must be 60 seconds or greater (it must be this minimum for it to work)
- Can also do `cron` schedule [MOST Common Automation]


## RESOURCES
 https://docs.prefect.io/concepts/schedules/

# LIBRARIES

In [1]:
import subprocess

def install(package):
    subprocess.check_call(["pip", "install", package])

# try to import package if it exist or else install package using pip
try:
    import prefect
except ImportError:
    install("prefect")

try:
    import yfinance
except ImportError:
    install("yfinance")

In [2]:
# Data manipulation
import pandas as pd

# Extracting stock data from yahoo finance website
import yfinance as yf

# ETL Automation process
from prefect import task, flow

In [3]:
  # Acess yf API
data = yf.download(
      tickers = "AAPL",
      period = "3h",
      interval = "5m"
  )
df=pd.DataFrame(data)
df.shape

[*********************100%%**********************]  1 of 1 completed


(36, 6)

## EXTRACT
- Fetch Bitcoin prices from the yfinance API.
- If fails, retry twice (3-second delay).

*We use the last 3 hours of Bitcoin prices to make 5 minute interval forecast using `modeltime`.*

In [4]:
@task(
    name="Extract S&P 500 Stock Prices",
    retries = 2,
    retry_delay_seconds=3
)

def extract_sp500_prices(
    tickers: str,
    period: str,
    interval: str
    ) -> pd.DataFrame:

  # Acess yf API
  data = yf.download(
      tickers = tickers,
      period = period,
      interval = interval
  )
  pd.DataFrame(data).to_csv("/content/sp500_prices.csv")
  return data

# TRANSFORM

In [5]:
@task
def transform(
    data: pd.DataFrame
    ) -> pd.DataFrame:

  return data

# LOAD

In [6]:
# Store the S&P 500 stock price data in a CSV
@task
def load_sp500_prices(
    data: pd.DataFrame,
    path: str
    ) -> None:

  data.to_csv(
      path_or_buf=path,
      index=True
      )

# PREFECT FLOW


In [7]:
# main_flow() parameters are now changeable
@flow(
    name="Bitcoin Price Pipeline"
    )

def main_flow(
    tickers = "BTC-USD",
    period  = "1mo",
    interval= "5m",
    path   = "/content/sp500_prices.csv"
    ):

  print(">>> Extracting S&P 500 Stock Prices")
  df = extract_sp500_prices(
      tickers=tickers,
      period=period,
      interval=interval
  )

  print(">>> Doing Transform")
  df = transform(df)

  print(f">>> Storing S&P 500 Stock Prices: {path}")
  load_sp500_prices(
      data=df,
      path=path
  )

# MAIN PROGRAM

**GOALS**
- Handle API failure (retries)
- Move key parameters to `main_flow()`

In [8]:
if __name__=="__main__":
  main_flow(
      tickers  = "BTC-USD",
      period   = "24h",
      interval = "2m",
      # WARNING: Relative paths won't work with deployments
      # Solution is to override the parameters in the
      # deployment.YAML file with the absolute path
      path     = "/content/drive/MyDrive/Data Science Projects/ETL Data/sp500_prices.csv"
  )

  next(self.gen)
  next(self.gen)


>>> Extracting S&P 500 Stock Prices


[*********************100%%**********************]  1 of 1 completed


>>> Doing Transform


>>> Storing S&P 500 Stock Prices: /content/drive/MyDrive/Data Science Projects/ETL Data/sp500_prices.csv


# TESTING
`python ETL_flow_with_prefect.ipynb`

# DEPLOYMENT STEPS & CLI COMMANDS:
1. BUILD:
    `prefect deployment build /03_prefect/flow_04_deployment.py:main_sp500_flow --name sp500_flow --interval 60`
  
2. PARAMETERS:
    path: '/users/eddiecheteni/Desktop/labs/sp500_prices.csv'

3. APPLY:
    `prefect deployment apply main_sp500_flow-deployment.yaml`

4. LIST DEPLOYMENTS:
    `prefect deployment ls`

5. RUN:
    `prefect deployment run "S&P 500 Stock Price Pipeline/sp500_flow"`

6. ORION GUI:
    `prefect orion start`

7. AGENT START:
    `prefect agent start --work-queue "default"`

8. Ctrl + C to exit
