In [1]:
import sqlite3
import pandas as pd
import numpy as np
from newsapi import newsapi_client
import os
import yfinance as yf
import sqlite3

Data Collection & API set up

In [5]:
api_key = os.getenv('NEWSAPI_KEY') # 환경 변수 설정
auth = newsapi_client.NewsApiClient(api_key=api_key)

In [6]:
news1 = auth.get_everything(q="Trump",from_param="2024-11-12",to="2024-11-13",language="en",sort_by="relevancy",page_size=5)
news2 = auth.get_everything(q="Trump",from_param="2024-11-11",to="2024-11-12",language="en",sort_by="relevancy",page_size=5)
news3 = auth.get_everything(q="Trump",from_param="2024-11-10",to="2024-11-11",language="en",sort_by="relevancy",page_size=5)

In [7]:
aapl = yf.download(tickers='AAPL', start='2024-11-12',end='2024-11-13',interval='15m')
aapl.columns = aapl.columns.droplevel("Ticker")

msft = yf.download(tickers='MSFT', start='2024-11-12',end='2024-11-13',interval='15m')
msft.columns = msft.columns.droplevel("Ticker")

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


In [8]:
class DataLake:
    def __init__(self,db_name="datalake.db"):
        self.raw_data = {} # raw json type
        self.processed_data = {}
        self.db_name = db_name
        
    def access_control(func):
        def wrapper(*args, **kwargs):
            password = "1234"
            input_pw = input("Type in your password")
            if input_pw == password:
                result = func(*args,**kwargs)
            else:
                result = "You have no access to this DataLake!"
            return result
        return wrapper
        
    @access_control
    def store_data(self, dataset_name, data, processed=False): # adding data
        if processed:
            with sqlite3.connect(self.db_name) as conn:
                if dataset_name not in self.processed_data:
                    self.processed_data[dataset_name] = data
                    data.to_sql(dataset_name, conn, index=False, if_exists='replace')
                else:  # Append new data if table already exists
                    self.processed_data[dataset_name] = pd.concat([self.processed_data[dataset_name], data])
                    data.to_sql(dataset_name, conn, index=False, if_exists='append')

        else:
            if dataset_name not in self.raw_data:
                self.raw_data[dataset_name] = data
            else: # preventing overrridinng
                self.raw_data[dataset_name] += data
    
    @access_control
    def retrieve_data(self, dataset_name, processed=False, sql_query = None): # for data filtering and extraction
        if processed: # assuming value for each processed key is a dataframe
            with sqlite3.connect(self.db_name) as conn:
                if sql_query is None:
                    sql_query = f"SELECT * FROM {dataset_name}"
                try:
                    query_data = pd.read_sql_query(sql_query, conn)
                    return query_data
                except Exception as e:
                    print(f"Error: {e}")
                    return None
            
        else: # if raw data just return its raw data
            return self.raw_data[dataset_name]
        
    
    

In [9]:
test = DataLake()
test.store_data("test",news1['articles'])
data = test.retrieve_data("test")
test.store_data("test1",news2['articles'])
test.store_data("test2",news3['articles'])
test.store_data("aapl",aapl,processed=True)
test.store_data("msft",msft,processed=True)

In [10]:
test.retrieve_data("aapl",processed=True,sql_query="SELECT * FROM aapl,msft WHERE aapl.Volume<1000000 AND msft.Volume<1000000 ")

Unnamed: 0,Adj Close,Close,High,Low,Open,Volume,Adj Close.1,Close.1,High.1,Low.1,Open.1,Volume.1
0,224.169998,224.169998,224.235001,223.690002,223.914993,863846,418.744995,418.744995,420.010010,418.559998,419.540710,716553
1,224.169998,224.169998,224.235001,223.690002,223.914993,863846,420.200012,420.200012,420.260010,417.709991,418.799988,613220
2,224.169998,224.169998,224.235001,223.690002,223.914993,863846,418.989990,418.989990,420.299988,418.809998,420.200012,400057
3,224.169998,224.169998,224.235001,223.690002,223.914993,863846,419.730011,419.730011,419.965393,418.750000,418.980011,346740
4,224.169998,224.169998,224.235001,223.690002,223.914993,863846,419.000214,419.000214,420.000000,418.820007,419.765015,450805
...,...,...,...,...,...,...,...,...,...,...,...,...
355,224.600006,224.600006,224.820007,224.460007,224.500000,763948,423.265015,423.265015,423.399994,422.899994,423.309998,236394
356,224.600006,224.600006,224.820007,224.460007,224.500000,763948,423.709991,423.709991,423.839996,423.079987,423.282501,292904
357,224.600006,224.600006,224.820007,224.460007,224.500000,763948,424.007690,424.007690,424.439911,423.630096,423.739990,369294
358,224.600006,224.600006,224.820007,224.460007,224.500000,763948,423.552002,423.552002,424.049988,423.300293,423.950012,558675


In [37]:
class DataCategory:

    def __init__(self, name):
        self.name = name
        self.datasets = {}  # Dictionary to store datasets with their metadata 

    def add_dataset(self, dataset_name, metadata=None):
        """
        param metadata: Metadata dictionary (e.g., description, parameters, etc.).
        """
        if dataset_name not in self.datasets:
            self.datasets[dataset_name] = metadata or {}

    def search(self, keyword):
        """
        Search for datasets within the category by keyword.
        :param keyword: Keyword to search in dataset names or metadata.
        :return: List of matching datasets.
        """
        results = []
        for dataset_name, metadata in self.datasets.items():
            if keyword.lower() in dataset_name.lower() or any(
                keyword.lower() in str(value).lower() for value in metadata.values()
            ):
                results.append((dataset_name, metadata))
        return results


class DataCatalog:
    """
    Organizes datasets into categories and provides metadata for easy discovery.
    """
    def __init__(self, data_lake):
        """
        Initialize the catalog with a reference to the DataLake.
        :param data_lake: An instance of the DataLake class.
        """
        self.categories = {}
        self.data_lake = data_lake  # Link to the DataLake instance

    def add_category(self, category_name):
        """
        Add a new category to the catalog.
        :param category_name: Name of the category.
        """
        if category_name not in self.categories:
            self.categories[category_name] = DataCategory(category_name)

    def add_dataset(self, category_name, dataset_name, data, metadata=None, processed=False):

        if category_name not in self.categories:
            self.add_category(category_name)

        # Add dataset to the category
        self.categories[category_name].add_dataset(dataset_name, metadata)

        # Store the dataset in the DataLake
        self.data_lake.store_data(dataset_name, data, processed=processed)

    def list_datasets(self, category_name): # 카테고리 내 모든 table

        if category_name in self.categories:
            return [
                {"dataset_name": name, "metadata": metadata}
                for name, metadata in self.categories[category_name].datasets.items()
            ]
        return f"Category '{category_name}' not found."

    def search_data(self, keyword):

        results = []
        for category_name, category in self.categories.items():
            matches = category.search(keyword) # table명 match
            for dataset_name, metadata in matches:
                results.append({
                    "category": category_name,
                    "dataset_name": dataset_name,
                    "metadata": metadata
                })
        return results

    def retrieve_dataset(self, dataset_name, processed=False, sql_query=None):
        
        return self.data_lake.retrieve_data(dataset_name, processed=processed, sql_query=sql_query)


In [38]:
data_catalog = DataCatalog(test)
data_catalog

<__main__.DataCatalog at 0x18b3d7366c0>

In [41]:
data_catalog.add_dataset(
    "Equities", "aapl", aapl,
    metadata={"description": "Apple stock data", "parameters": ["Adj Close", "Close","High","Low","Open","Volume"]},
    processed=True
)
data_catalog.add_dataset(
    "Equities", "msft", msft,
    metadata={"description": "Microsoft stock data", "parameters": ["Adj Close", "Close","High","Low","Open","Volume"]},
    processed=True
)

In [42]:
# List datasets in a category
print("Datasets in 'Equities':", data_catalog.list_datasets("Equities"))

# Search for datasets
print("Search results for 'Apple':", data_catalog.search_data("Apple"))

# Retrieve a dataset with SQL query
query_result = data_catalog.retrieve_dataset("AAPL", processed=True, sql_query="SELECT * FROM AAPL WHERE Volume < 1000000")
print("Query result:\n", query_result)

Datasets in 'Equities': [{'dataset_name': 'aapl', 'metadata': {'description': 'Apple stock data', 'parameters': ['Date', 'Volume']}}, {'dataset_name': 'msft', 'metadata': {'description': 'Microsoft stock data', 'parameters': ['Date', 'Volume']}}]
Search results for 'Apple': [{'category': 'Equities', 'dataset_name': 'aapl', 'metadata': {'description': 'Apple stock data', 'parameters': ['Date', 'Volume']}}]
Query result:
      Adj Close       Close        High         Low        Open  Volume
0   224.169998  224.169998  224.235001  223.690002  223.914993  863846
1   224.919998  224.919998  225.199997  224.830002  224.999893  802025
2   223.919998  223.919998  224.839996  223.884995  224.794998  896875
3   224.130005  224.130005  224.399994  223.860001  223.919998  771170
4   223.419907  223.419907  224.149994  223.386002  224.139999  712087
5   223.863007  223.863007  223.897507  223.354996  223.419998  641269
6   224.470001  224.470001  224.470001  223.779999  223.860001  689858
7   224.

In [None]:
class DataWorkbench:

    def __init__(self):
        self.data_storage = {}
        
    def store_data(self, dataset_name, data):
        self.data_storage[dataset_name] = data

    def retrieve_data(self, dataset_name):
        return self.data_storage.get(dataset_name, "Dataset not found")

    def transform_data(self, dataset_name, transformation_func):
        data = self.retrieve_data(dataset_name)
        return transformation_func(data) if data else "Dataset not found"

In [None]:
class IntradayDataModel:

    def __init__(self, timestamp, price, volume, symbol):
        self.timestamp = timestamp
        self.price = price
        self.volume = volume
        self.symbol = symbol

    def aggregate_by_interval(self, interval):
        # Implement aggregation logic here

        pass

In [None]:
class NewsDataModel:

    def __init__(self, timestamp, headline, sentiment_score, relevance):
        self.timestamp = timestamp
        self.headline = headline
        self.sentiment_score = sentiment_score
        self.relevance = relevance

    def filter_by_sentiment(self, threshold):
        # Implement sentiment filtering here
        pass