In [12]:
import numpy as np
import pandas as pd
import yfinance
from pymongo import MongoClient

# from dags.data_ELT_process import db_pass, db_user, db_name

In [13]:
# Function to pull data from MongoDB and convert it to DataFrames
def pulling_data_db():
    # Connect to MongoDB
    client = MongoClient('localhost', 27017)
    db = client['financeStockData']

    collection = db['AAPL_stock_data']
    
    # Fetch all documents from each collection
    dt1 = list(collection.find())     # Convert cursor to list for DataFrame
 
    # Convert list of documents (dictionaries) to pandas DataFrames
    appl_data = pd.DataFrame(dt1)
    
    return appl_data # tesl_fin, goo_fin

In [14]:
fin_data = pulling_data_db()
fin_data.head()

Unnamed: 0,_id,Date,Open,High,Low,Close,Adj Close,Volume
0,6717b67af296b51242ed03dd,2023-10-23,170.910004,174.009995,169.929993,173.0,172.119263,55980100
1,6717b67af296b51242ed03de,2023-10-24,173.050003,173.669998,171.449997,173.440002,172.557007,43816600
2,6717b67af296b51242ed03df,2023-10-25,171.880005,173.059998,170.649994,171.100006,170.228943,57157000
3,6717b67af296b51242ed03e0,2023-10-26,170.369995,171.380005,165.669998,166.889999,166.040375,70625300
4,6717b67af296b51242ed03e1,2023-10-27,166.910004,168.960007,166.830002,168.220001,167.363586,58499100


In [15]:
fin_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 270 entries, 0 to 269
Data columns (total 8 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   _id        270 non-null    object 
 1   Date       270 non-null    object 
 2   Open       270 non-null    float64
 3   High       270 non-null    float64
 4   Low        270 non-null    float64
 5   Close      270 non-null    float64
 6   Adj Close  270 non-null    float64
 7   Volume     270 non-null    int64  
dtypes: float64(5), int64(1), object(2)
memory usage: 17.0+ KB


In [107]:
fin_data = fin_data[["Date", "Adj Close"]]
fin_data.plot(kind='line', title = "checking for stationarity");

In [108]:
fin_data.index = pd.to_datetime(fin_data.index)

In [109]:
# Average stock prices per month, week and day
weekly_price = fin_data['Adj Close'].resample('W').mean()
monthly_price = fin_data['Adj Close'].resample('M').mean()
daily_price = fin_data['Adj Close'].resample('D').mean()

print(f"Weekly average price of stock is {weekly_price}")
print(f"Monthly average price of stock is {monthly_price}")
print(f"Daily average price of stock is {daily_price}")

In [110]:
fin_data['Date'] = pd.to_datetime(fin_data['Date'])
fin_data['month'] = fin_data['Date'].dt.month
fin_data['day'] = fin_data['Date'].dt.day
fin_data['year'] = fin_data['Date'].dt.year
fin_data.head()

In [111]:
fin_data = fin_data.drop(columns = ['Date'], axis =1)
fin_data.head()

In [112]:
fin_data.duplicated().sum()

In [113]:
fin_data['Adj Close'].describe()

In [114]:
df = fin_data.values.flatten()

In [115]:
from statsmodels.tsa.stattools import adfuller

def adf_test(timeseries):

    print('Results of Dickey-Fuller Test:')
    adf_test_result = adfuller(timeseries, autolag='AIC')
    
    adf_output = pd.Series(adf_test_result[0:4], index=['Test Statistic', 'p-value', '#Lags Used', 'Number of Observations Used'])
    for key, value in adf_test_result[4].items():
        adf_output['Critical Value (%s)' % key] = value
    
    print(adf_output)

    # Interpret the p-value
    if adf_output['p-value'] < 0.05:
        print("\nConclusion: Reject the null hypothesis. The time series is stationary.")
    else:
        print("\nConclusion: Cannot reject the null hypothesis. The time series is non-stationary.")


In [116]:
adf_test(df)

In [117]:
season_length = 3
df_diff = fin_data['Adj Close'].diff(periods=season_length).dropna()  # Adjust season_length accordingly
df_diff

In [118]:
# df = pd.DataFrame(df_diff)
df_diff.plot(kind="line")

In [119]:
adf_test(df_diff)

In [120]:
df_smoothed = df_diff.rolling(window=2).mean()  # Adjust window size
df_smoothed.dropna(inplace=True)

In [121]:
# Checking for trend in the time series data
from statsmodels.tsa.seasonal import seasonal_decompose
import matplotlib.pyplot as plt

decomposition = seasonal_decompose(df_smoothed, model='additive', period=12)  # Adjust 'period'

# Plot the decomposed components
decomposition.plot()
plt.show()

In [122]:
decomposition.trend

In [123]:
df_smoothed.plot(kind='line', title="Stationarity plot");

In [124]:
# Normalizing data
from sklearn.preprocessing import StandardScaler

def scaling_data(data):
    scale = StandardScaler()
    
    normalized_data = scale.fit_transform(data)
    
    return normalized_data

In [125]:
reshap = np.array(df_smoothed).reshape(-1, 1)
scalled = scaling_data(reshap)

df = pd.DataFrame(scalled)

In [126]:
df.head()

In [127]:
X_train = df.loc[0:199, ]
X_test = df.loc[199:, ]

In [128]:
print(X_train.shape)
print(X_test.shape)

In [129]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM

In [130]:
# Using multi-step forecasting method

sequence_length = 30
forecast_horizon = 7

def create_sequence(data, sequence_length, horizon):
    
    X, y = [], []
    
    for x in range(len(data) - sequence_length - horizon +1):
        X.append(data[x:x + sequence_length])
        y.append(data[x + sequence_length:x+season_length+horizon])
    
    return np.array(X), np.array(y)

In [131]:
# Assigning data to X and Y
X, y = create_sequence(X_train, sequence_length, forecast_horizon)

In [134]:
# Splitting the data into train and test using 80% for training and 20% for testing
split = int(0.8 * len(X))
X_train, y_train = X[:split], y[:split]
X_test, y_test = X[split:], y[split:]

In [135]:
model = Sequential()

model.add(LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(LSTM(units=50))

model.add(Dense(y_train.shape[1]))  # Output units match the target dimension

model.compile(loss='mean_squared_error', optimizer='adam')

model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.1, verbose=1)

In [7]:
import json
import psycopg2
# Load secrets.json file
with open('/home/oem/PycharmProjects/data-eng-task/dags/secret.json') as f:
    secrets = json.load(f)

# Access secrets
db_name = secrets['DB_NAME']
db_user = secrets['DB_USER']
db_pass = secrets['DB_PASS']

In [5]:
class DatabaseHandler:
    def __init__(self, db_name, db_user, db_pass):
        self.db_name = db_name
        self.db_user = db_user
        self.db_pass = db_pass

    def connect_postgres(self):
        return psycopg2.connect(dbname=self.db_name, user=self.db_user, host="localhost", port="5432",
                                password=self.db_pass)

    def get_last_saved_date(self, table_name):
        conn = self.connect_postgres()
        cur = conn.cursor()
        cur.execute(f"SELECT MAX(Date) FROM {table_name};")
        last_date = cur.fetchone()[0]
        cur.close()
        conn.close()
        return last_date

    def insert_data_postgres(self, new_data):
        conn = self.connect_postgres()
        cur = conn.cursor()
        insert_query = """INSERT INTO apple_stock_data(Date, Open, High, Low, Close, Volume) VALUES(%s, %s, %s, %s, %s, %s)"""

        # Prepare dataset for insertion
        dataset = [(row["Date"], row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]) for row in new_data]
        cur.executemany(insert_query, dataset)  # Insert all at once
        conn.commit()  # Commit the transaction
        print(f"{len(dataset)} records inserted into apple_stock_data.")

        cur.close()
        conn.close()
    