In [75]:
# Import the Library 

from airflow import DAG 
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
import requests
import json
import time 
import pandas as pd
import os
import numpy as np

In [37]:
# Create a request from alphavantage with API 
def import_IBM_data():
    api_key = 'SWSFGG5JHXFNNZAO'
    url = 'https://www.alphavantage.co/query?function=TIME_SERIES_WEEKLY&symbol=IBM&apikey=' + api_key
    
    request = requests.get(url)

    try: 
        data = request.json()
        path = "/home/unix/data_center/data_lake"
        with open(path + "stock_market_row_data" + "IBM" + str(time.time()), 'w') as outfile:
            json.dump(data, outfile)
    except:
        pass

In [38]:
# Define the default arguments for my DAG
default_dag_arg = {
    'owner': 'lorenzo',
    'start_date': datetime(2023, 10, 10),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes= 5),
    'project_id': 1
}

In [39]:
# Define DAG
with DAG(
    dag_id= 'fatch_IBM_data',
    schedule_interval= '@daily',
    catchup= False,
    default_args= default_dag_arg
) as dag:
    # Define Task
    task_1 = PythonOperator(
        task_id= 'fatch_data',
        python_callable= import_IBM_data,
        dag= dag
    )

In [40]:
api_key = 'SWSFGG5JHXFNNZAO'
url = 'https://www.alphavantage.co/query?function=TIME_SERIES_WEEKLY&symbol=IBM&apikey=' + api_key

request = requests.get(url)

In [41]:
data = request.json()

In [42]:
data.keys()

dict_keys(['Meta Data', 'Weekly Time Series'])

In [43]:
data['Meta Data']

{'1. Information': 'Weekly Prices (open, high, low, close) and Volumes',
 '2. Symbol': 'IBM',
 '3. Last Refreshed': '2023-10-11',
 '4. Time Zone': 'US/Eastern'}

In [44]:
data['Weekly Time Series'].keys()

dict_keys(['2023-10-11', '2023-10-06', '2023-09-29', '2023-09-22', '2023-09-15', '2023-09-08', '2023-09-01', '2023-08-25', '2023-08-18', '2023-08-11', '2023-08-04', '2023-07-28', '2023-07-21', '2023-07-14', '2023-07-07', '2023-06-30', '2023-06-23', '2023-06-16', '2023-06-09', '2023-06-02', '2023-05-26', '2023-05-19', '2023-05-12', '2023-05-05', '2023-04-28', '2023-04-21', '2023-04-14', '2023-04-06', '2023-03-31', '2023-03-24', '2023-03-17', '2023-03-10', '2023-03-03', '2023-02-24', '2023-02-17', '2023-02-10', '2023-02-03', '2023-01-27', '2023-01-20', '2023-01-13', '2023-01-06', '2022-12-30', '2022-12-23', '2022-12-16', '2022-12-09', '2022-12-02', '2022-11-25', '2022-11-18', '2022-11-11', '2022-11-04', '2022-10-28', '2022-10-21', '2022-10-14', '2022-10-07', '2022-09-30', '2022-09-23', '2022-09-16', '2022-09-09', '2022-09-02', '2022-08-26', '2022-08-19', '2022-08-12', '2022-08-05', '2022-07-29', '2022-07-22', '2022-07-15', '2022-07-08', '2022-07-01', '2022-06-24', '2022-06-17', '2022-06-

In [45]:
data['Weekly Time Series']['2023-10-11'].keys()

dict_keys(['1. open', '2. high', '3. low', '4. close', '5. volume'])

In [49]:
clean_data = pd.DataFrame(data['Weekly Time Series']).T

In [55]:
clean_data['ticke'] = data['Meta Data']['2. Symbol']

In [58]:
clean_data['meta_data'] = str(data['Meta Data'])

In [63]:
clean_data['time_stamp'] = pd.to_datetime('now')

In [64]:
clean_data

Unnamed: 0,1. open,2. high,3. low,4. close,5. volume,ticke,meta_data,time_stamp
2023-10-11,142.3000,143.4150,140.6800,143.2300,7881639,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943
2023-10-06,140.0400,142.9400,139.8600,142.0300,15932918,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943
2023-09-29,146.5700,147.4300,139.6100,140.3000,23445425,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943
2023-09-22,145.7700,151.9299,144.6600,146.9100,23597168,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943
2023-09-15,148.5700,148.7800,145.5300,145.9900,19316647,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943
...,...,...,...,...,...,...,...,...
1999-12-10,113.0000,122.1200,107.5600,109.0000,58626000,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943
1999-12-03,104.9400,112.8700,102.1200,111.8700,37670000,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943
1999-11-26,105.5000,109.8700,101.8100,105.0000,37165600,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943
1999-11-19,96.0000,105.1200,92.6200,103.9400,61550800,IBM,"{'1. Information': 'Weekly Prices (open, high,...",2023-10-12 15:13:58.914943


In [65]:
def format_data():
    
    #input
    data = ''
    
    # Format data
    clean_data = pd.DataFrame(data['Weekly Time Series']).T
    clean_data['ticke'] = data['Meta Data']['2. Symbol']
    clean_data['meta_data'] = str(data['Meta Data'])
    clean_data['time_stamp'] = pd.to_datetime('now')
    
    #Output - storage data 
    output_path = ''
    clean_data.to_csv(output_path + 'IBM_snapshot_daily_' + str(pd.to_datetime('now')))
    

In [96]:
# Logic handler 
read_path = '/home/unix/data_center/data_lake/'
ticker = 'IBM'
latest = np.max([float(file.split('_')[-1]) for file in os.listdir(read_path) if ticker in file])
latest_file = [file for file in os.listdir(read_path) if str(latest) in file][0]

file = open(read_path + latest_file)
data = json.load(file)

In [98]:
file

<_io.TextIOWrapper name='/home/unix/data_center/data_lake/stock_market_raw_dataIBM_1697111940.3560448' mode='r' encoding='UTF-8'>