## Importing necessary libraries

In [1]:
import sqlite3
import pandas as pd
import random
from datetime import datetime, timedelta
import math

## 1. Singleton Pattern for Database Connection

In [2]:
class DatabaseConnection:
    _instance = None
    _initialized = False

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super(DatabaseConnection, cls).__new__(cls)
        return cls._instance

    def __init__(self):
        if not DatabaseConnection._initialized:
            self._connection = sqlite3.connect('company_database.db')
            DatabaseConnection._initialized = True

    @staticmethod
    def getInstance():
        if DatabaseConnection._instance is None:
            DatabaseConnection._instance = DatabaseConnection()
        return DatabaseConnection._instance

    def getConnection(self):
        return self._connection

    def close(self):
        if hasattr(self, '_connection') and self._connection:
            self._connection.close()
            DatabaseConnection._initialized = False
            DatabaseConnection._instance = None
            print("Database connection closed.")
        else:
            raise Exception("No connection to close.")

## 2. Strategy Pattern for Financial Analysis

In [3]:
# Strategy Interface
class AnalysisStrategy:
    """Strategy interface that declares operations common to all supported algorithms"""
    def calculate(self, time_series):
        """Calculate analytical metrics based on time series data"""
        raise NotImplementedError("Subclasses must implement calculate()")

# Concrete Strategy Classes
class BollingerBandsStrategy(AnalysisStrategy):
    """Concrete strategy implementing the Bollinger Bands algorithm"""
    def __init__(self, window_size=20, width=2):
        self.window_size = window_size
        self.width = width

    def calculate(self, time_series):
        rolling_mean = time_series['value'].rolling(self.window_size).mean()
        rolling_std = time_series['value'].rolling(self.window_size).std()

        return {
            'moving_average': rolling_mean,
            'high_bollinger': rolling_mean + (rolling_std * self.width),
            'low_bollinger': rolling_mean - (rolling_std * self.width)
        }

class SimpleMovingAverageStrategy(AnalysisStrategy):
    """Concrete strategy implementing a simple moving average algorithm"""
    def __init__(self, window_size=20):
        self.window_size = window_size

    def calculate(self, time_series):
        moving_avg = time_series['value'].rolling(self.window_size).mean()
        return {
            'moving_average': moving_avg,
            'high_bollinger': None,
            'low_bollinger': None
        }

# Context Class
class FinancialAnalyzer:
    """Context that maintains a reference to a Strategy object"""
    def __init__(self, strategy=None):
        self._strategy = strategy

    def set_strategy(self, strategy):
        """Allows changing the strategy object at runtime"""
        self._strategy = strategy

    def analyze(self, time_series):
        """Delegates algorithm execution to the strategy object"""
        if not self._strategy:
            raise ValueError("Strategy not set")
        return self._strategy.calculate(time_series)

## 3. Factory Pattern

In [4]:
# Step 1: Product Interface
class ICompany:
    """Interface for Company products"""

    def get_id(self):
        pass

    def get_ticker(self):
        pass

    def get_name(self):
        pass

    def load_time_series(self):
        pass

    def analyze(self, strategy):
        pass


# Step 2: Concrete Products
class StandardCompany(ICompany):
    def __init__(self, company_id, ticker, name):
        self.company_id = company_id
        self.ticker = ticker
        self.name = name
        self.time_series = None
        self.high_bollinger = None
        self.low_bollinger = None
        self.moving_average = None
        self.grade = None

    def get_id(self):
        return self.company_id

    def get_ticker(self):
        return self.ticker

    def get_name(self):
        return self.name

    def load_time_series(self):
        conn = DatabaseConnection.getInstance().getConnection()
        query = '''
                SELECT date, value
                FROM TimeSeries
                WHERE company_id = ?
                ORDER BY date
                '''
        self.time_series = pd.read_sql_query(query, conn, params=(self.company_id,))
        self.time_series['date'] = pd.to_datetime(self.time_series['date'])
        return self

    def analyze(self, strategy):
        if self.time_series is None:
            self.load_time_series()

        results = strategy.calculate(self.time_series)
        self.moving_average = results['moving_average']
        self.high_bollinger = results['high_bollinger']
        self.low_bollinger = results['low_bollinger']
        return self

    def assign_grade(self):
        latest_value = self.time_series['value'].iloc[-1]
        if latest_value > self.high_bollinger.iloc[-1]:
            self.grade = 'A'
        elif latest_value < self.low_bollinger.iloc[-1]:
            self.grade = 'C'
        else:
            self.grade = 'B'
        return self

    def display(self):
        print(f'Company: {self.name} ({self.ticker})')
        print(f'Grade: {self.grade}')
        print('Time Series Data:')
        print(self.time_series.tail())
        print('Moving Average:')
        print(self.moving_average.tail())
        print('High Bollinger Band:')
        print(self.high_bollinger.tail())
        print('Low Bollinger Band:')
        print(self.low_bollinger.tail())


class DomesticCompany(ICompany):
    def __init__(self, company_id, ticker, name):
        self.company_id = company_id
        self.ticker = ticker
        self.name = name
        self.time_series = None
        self.high_bollinger = None
        self.low_bollinger = None
        self.moving_average = None
        self.grade = None
        self.company_type = "Domestic"

    def get_id(self):
        return self.company_id

    def get_ticker(self):
        return self.ticker

    def get_name(self):
        return self.name

    def load_time_series(self):
        conn = DatabaseConnection.getInstance().getConnection()
        query = '''
                SELECT date, value
                FROM TimeSeries
                WHERE company_id = ?
                ORDER BY date
                '''
        self.time_series = pd.read_sql_query(query, conn, params=(self.company_id,))
        self.time_series['date'] = pd.to_datetime(self.time_series['date'])
        return self

    def analyze(self, strategy):
        if self.time_series is None:
            self.load_time_series()

        results = strategy.calculate(self.time_series)
        self.moving_average = results['moving_average']
        self.high_bollinger = results['high_bollinger']
        self.low_bollinger = results['low_bollinger']
        return self

    def assign_grade(self):
        latest_value = self.time_series['value'].iloc[-1]
        if latest_value > self.high_bollinger.iloc[-1]:
            self.grade = 'A'
        elif latest_value < self.low_bollinger.iloc[-1]:
            self.grade = 'C'
        else:
            self.grade = 'B'
        return self

    def display(self):
        print(f'Company: {self.name} ({self.ticker}) - Type: {self.company_type}')
        print(f'Grade: {self.grade}')
        print('Time Series Data:')
        print(self.time_series.tail())
        print('Moving Average:')
        print(self.moving_average.tail())
        print('High Bollinger Band:')
        print(self.high_bollinger.tail())
        print('Low Bollinger Band:')
        print(self.low_bollinger.tail())


class ForeignCompany(ICompany):
    def __init__(self, company_id, ticker, name):
        self.company_id = company_id
        self.name = name
        self.ticker = ticker  # Will always be 'ZZZZ'
        self.time_series = None
        self.high_bollinger = None
        self.low_bollinger = None
        self.moving_average = None
        self.grade = None
        self.company_type = "Foreign"

    # All other methods remain the same as DomesticCompany
    def get_id(self):
        return self.company_id

    def get_ticker(self):
        return self.ticker

    def get_name(self):
        return self.name

    def load_time_series(self):
        conn = DatabaseConnection.getInstance().getConnection()
        query = '''
                SELECT date, value
                FROM TimeSeries
                WHERE company_id = ?
                ORDER BY date
                '''
        self.time_series = pd.read_sql_query(query, conn, params=(self.company_id,))
        self.time_series['date'] = pd.to_datetime(self.time_series['date'])
        return self

    def analyze(self, strategy):
        if self.time_series is None:
            self.load_time_series()

        results = strategy.calculate(self.time_series)
        self.moving_average = results['moving_average']
        self.high_bollinger = results['high_bollinger']
        self.low_bollinger = results['low_bollinger']
        return self

    def assign_grade(self):
        latest_value = self.time_series['value'].iloc[-1]
        if latest_value > self.high_bollinger.iloc[-1]:
            self.grade = 'A'
        elif latest_value < self.low_bollinger.iloc[-1]:
            self.grade = 'C'
        else:
            self.grade = 'B'
        return self

    def display(self):
        print(f'Company: {self.name} (ID: {self.company_id}) - Type: {self.company_type}')
        print(f'Grade: {self.grade}')
        print('Time Series Data:')
        print(self.time_series.tail())
        print('Moving Average:')
        print(self.moving_average.tail())
        print('High Bollinger Band:')
        print(self.high_bollinger.tail())
        print('Low Bollinger Band:')
        print(self.low_bollinger.tail())


class CryptoCurrencyCompany(ICompany):
    def __init__(self, company_id, ticker, name):
        self.company_id = company_id
        self.ticker = ticker
        self.name = name
        self.time_series = None
        self.high_bollinger = None
        self.low_bollinger = None
        self.moving_average = None
        self.grade = None
        self.company_type = "Cryptocurrency"

    def get_id(self):
        return self.company_id

    def get_ticker(self):
        return self.ticker

    def get_name(self):
        return self.name

    def load_time_series(self):
        conn = DatabaseConnection.getInstance().getConnection()
        query = '''
                SELECT date, value
                FROM TimeSeries
                WHERE company_id = ?
                ORDER BY date
                '''
        self.time_series = pd.read_sql_query(query, conn, params=(self.company_id,))
        self.time_series['date'] = pd.to_datetime(self.time_series['date'])
        return self

    def analyze(self, strategy):
        if self.time_series is None:
            self.load_time_series()

        # For crypto, we may want a smaller window size by default
        if isinstance(strategy, BollingerBandsStrategy) and strategy.window_size > 10:
            # Clone the strategy with a smaller window
            strategy = BollingerBandsStrategy(window_size=10, width=strategy.width)
            print(f"Note: Using adjusted window size of 10 for cryptocurrency analysis")

        results = strategy.calculate(self.time_series)
        self.moving_average = results['moving_average']
        self.high_bollinger = results['high_bollinger']
        self.low_bollinger = results['low_bollinger']
        return self

    def assign_grade(self):
        latest_value = self.time_series['value'].iloc[-1]

        # For crypto, we adjust the grading scale to account for higher volatility
        if self.high_bollinger is not None and self.low_bollinger is not None:
            range_size = self.high_bollinger.iloc[-1] - self.low_bollinger.iloc[-1]
            middle = (self.high_bollinger.iloc[-1] + self.low_bollinger.iloc[-1]) / 2

            if latest_value > middle + (range_size * 0.3):
                self.grade = 'A'  # Potentially overbought
            elif latest_value < middle - (range_size * 0.3):
                self.grade = 'C'  # Potentially oversold
            else:
                self.grade = 'B'  # Within normal range
        else:
            self.grade = 'N/A'

        return self

    def display(self):
        print(f'Cryptocurrency: {self.name} ({self.ticker}) - Type: {self.company_type}')
        print(f'Grade: {self.grade}')
        print('Time Series Data (last 5 entries):')
        print(self.time_series.tail())

        if self.moving_average is not None:
            print('Moving Average (last 5 entries):')
            print(self.moving_average.tail())

        if self.high_bollinger is not None:
            print('High Bollinger Band (last 5 entries):')
            print(self.high_bollinger.tail())
            print('Low Bollinger Band (last 5 entries):')
            print(self.low_bollinger.tail())


# Step 3: Creator Abstract Class
class CompanyCreator:
    """Abstract creator class that declares the factory method"""

    def create_company(self):
        """Factory method to be implemented by subclasses"""
        pass

    def get_company_data(self):
        """Hook method that returns the data needed to create a company"""
        pass


# Step 4: Concrete Creators
class CompanyByTickerCreator(CompanyCreator):
    def __init__(self, ticker):
        self.ticker = ticker

    def get_company_data(self):
        conn = DatabaseConnection.getInstance().getConnection()
        cursor = conn.cursor()
        query = 'SELECT id, ticker, name, company_type FROM companies WHERE ticker = ?'
        cursor.execute(query, (self.ticker,))
        return cursor.fetchone()

    def create_company(self):
        row = self.get_company_data()
        if row:
            company_id, ticker, name, company_type = row
            if company_type == 1:  # Foreign
                return ForeignCompany(company_id, ticker, name)
            elif company_type == 2:  # Crypto
                return CryptoCurrencyCompany(company_id, ticker, name)
            else:  # Domestic (type 0)
                return DomesticCompany(company_id, ticker, name)
        return None


class CompanyByIdCreator(CompanyCreator):
    def __init__(self, company_id):
        self.company_id = company_id

    def get_company_data(self):
        conn = DatabaseConnection.getInstance().getConnection()
        cursor = conn.cursor()
        query = 'SELECT id, ticker, name, company_type FROM companies WHERE id = ?'
        cursor.execute(query, (self.company_id,))
        return cursor.fetchone()

    def create_company(self):
        row = self.get_company_data()
        if row:
            company_id, ticker, name, company_type = row
            if company_type == 1:  # Foreign
                return ForeignCompany(company_id, ticker, name)
            elif company_type == 2:  # Crypto
                return CryptoCurrencyCompany(company_id, ticker, name)
            else:  # Domestic (type 0)
                return DomesticCompany(company_id, ticker, name)
        return None


# Factory Facade
class CompanyFactory:
    @staticmethod
    def create_by_ticker(ticker):
        creator = CompanyByTickerCreator(ticker)
        return creator.create_company()

    @staticmethod
    def create_by_id(company_id):
        creator = CompanyByIdCreator(company_id)
        return creator.create_company()

## 4. Facade for Simplified Usage

In [5]:
class CompanyAnalysisFacade:
    """Facade that simplifies company analysis operations"""

    @staticmethod
    def get_company_analysis(identifier, strategy_type="bollinger", window_size=20, width=2):
        """
        Performs complete company analysis with configurable parameters

        Parameters:
        - identifier: Company ticker (string) or ID (integer)
        - strategy_type: Analysis strategy to use ("bollinger" or "sma")
        - window_size: Window size for analysis calculations
        - width: Width parameter for Bollinger Bands

        Returns:
        - Analyzed company object or None if company not found
        """
        # Create company using the appropriate factory method
        if isinstance(identifier, int):
            company = CompanyFactory.create_by_id(identifier)
        else:
            company = CompanyFactory.create_by_ticker(identifier)

        if not company:
            return None

        # Create the appropriate strategy based on type
        if strategy_type.lower() == "bollinger":
            strategy = BollingerBandsStrategy(window_size=window_size, width=width)
        elif strategy_type.lower() == "sma":
            strategy = SimpleMovingAverageStrategy(window_size=window_size)
        else:
            raise ValueError(f"Unsupported strategy type: {strategy_type}")

        # Load and analyze using method chaining
        try:
            company.load_time_series().analyze(strategy).assign_grade()
            return company
        except Exception as e:
            print(f"Analysis failed: {str(e)}")
            return None

    @staticmethod
    def get_companies_by_grade(grade, strategy_type="bollinger"):
        """Get all companies with a specific grade after analysis"""
        conn = DatabaseConnection.getInstance().getConnection()
        cursor = conn.cursor()
        cursor.execute('SELECT id FROM companies')
        company_ids = [row[0] for row in cursor.fetchall()]

        result = []
        for company_id in company_ids:
            company = CompanyAnalysisFacade.get_company_analysis(company_id, strategy_type)
            if company and company.grade == grade:
                result.append(company)

        return result

    @staticmethod
    def perform_bulk_analysis(tickers=None, strategy_type="bollinger"):
        """Analyze multiple companies at once"""
        if tickers is None:
            # Get all tickers if none specified
            conn = DatabaseConnection.getInstance().getConnection()
            cursor = conn.cursor()
            cursor.execute('SELECT ticker FROM companies')
            tickers = [row[0] for row in cursor.fetchall()]

        results = {}
        for ticker in tickers:
            company = CompanyAnalysisFacade.get_company_analysis(ticker, strategy_type)
            if company:
                results[ticker] = company

        return results

## Database initialization function

In [6]:
def initialize_database():
    conn = DatabaseConnection.getInstance().getConnection()
    cursor = conn.cursor()

    # Drop existing database if it exists
    cursor.execute('DROP TABLE IF EXISTS TimeSeries')
    cursor.execute('DROP TABLE IF EXISTS companies')
    conn.commit()

    # Create the companies table if it doesn't exist
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS companies (
        id INTEGER PRIMARY KEY,
        ticker TEXT NOT NULL,
        name TEXT NOT NULL,
        company_type INTEGER DEFAULT 0  -- 0=domestic, 1=foreign, 2=crypto
    )
    ''')

    # Synthesize data
    domestic_companies = [
        (1, 'AAPL', 'Apple Inc.', 0),
        (2, 'GOOGL', 'Alphabet Inc.', 0),
        (3, 'MSFT', 'Microsoft Corporation', 0),
        (4, 'AMZN', 'Amazon.com Inc.', 0),
        (5, 'TSLA', 'Tesla Inc.', 0),
        (6, 'FB', 'Meta Platforms Inc.', 0),
        (7, 'NVDA', 'NVIDIA Corporation', 0),
        (8, 'NFLX', 'Netflix Inc.', 0),
        (9, 'ADBE', 'Adobe Inc.', 0),
        (10, 'ORCL', 'Oracle Corporation', 0)
    ]

    foreign_companies = [
        (11, 'ZZZZ', 'Toyota Motor Corp', 1),
        (12, 'ZZZZ', 'Samsung Electronics', 1),
        (13, 'ZZZZ', 'Alibaba Group', 1),
        (14, 'ZZZZ', 'LVMH', 1),
        (15, 'ZZZZ', 'Nestle SA', 1)
    ]

    crypto_companies = [
        (16, 'BTC', 'Bitcoin', 2),
        (17, 'ETH', 'Ethereum', 2),
        (18, 'SOL', 'Solana', 2),
        (19, 'ADA', 'Cardano', 2),
        (20, 'DOT', 'Polkadot', 2)
    ]

    # Insert data into the companies table
    cursor.executemany('''
    INSERT OR IGNORE INTO companies (id, ticker, name, company_type)
    VALUES (?, ?, ?, ?)
    ''', domestic_companies + foreign_companies + crypto_companies)

    # Create the TimeSeries table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS TimeSeries (
        id INTEGER PRIMARY KEY,
        company_id INTEGER,
        value REAL,
        date TEXT,
        FOREIGN KEY (company_id) REFERENCES companies(id)
    )
    ''')

    # Generate synthetic data including more frequent points for crypto
    time_series_data = []

    # For traditional companies - daily data
    start_date = datetime(2023, 1, 1)
    num_entries = 100

    for company in domestic_companies + foreign_companies:
        company_id = company[0]
        base_value = 100 if company_id <= 10 else 200  # Foreign companies have different base values
        for i in range(num_entries):
            date = start_date + timedelta(days=i)
            value = round(random.uniform(base_value, base_value + 400), 2)
            time_series_data.append((company_id, value, date.strftime('%Y-%m-%d %H:%M:%S')))

    # For crypto - hourly data for last 10 days (more frequent)
    for company in crypto_companies:
        company_id = company[0]
        base_value = 50  # Different base for crypto
        start_time = datetime.now() - timedelta(days=10)

        for i in range(240):  # 10 days × 24 hours
            timestamp = start_time + timedelta(hours=i)
            # More volatility for crypto
            value = round(random.uniform(base_value * 0.7, base_value * 1.3) *
                          (1 + 0.1 * math.sin(i/10)), 2)
            time_series_data.append((company_id, value, timestamp.strftime('%Y-%m-%d %H:%M:%S')))

    # Insert data into the TimeSeries table
    cursor.executemany('''
    INSERT OR IGNORE INTO TimeSeries (company_id, value, date)
    VALUES (?, ?, ?)
    ''', time_series_data)

    # Commit the transaction
    conn.commit()

## Example usage

In [7]:
if __name__ == "__main__":
    # Initialize database
    initialize_database()

    print("=== Demonstrating Factory Pattern for Different Company Types ===")

    # Create a domestic company
    print("\n1. Creating a domestic company:")
    apple = CompanyFactory.create_by_ticker('AAPL')
    if apple:
        strategy = BollingerBandsStrategy(window_size=20)
        apple.load_time_series().analyze(strategy).assign_grade()
        apple.display()

    # Create a foreign company
    print("\n2. Creating a foreign company:")
    toyota = CompanyFactory.create_by_id(11)
    if toyota:
        strategy = BollingerBandsStrategy(window_size=20)
        toyota.load_time_series().analyze(strategy).assign_grade()
        toyota.display()

    # Create a cryptocurrency company
    print("\n3. Creating a cryptocurrency company:")
    bitcoin = CompanyFactory.create_by_ticker('BTC')
    if bitcoin:
        strategy = BollingerBandsStrategy(window_size=20)  # Will be auto-adjusted
        bitcoin.load_time_series().analyze(strategy).assign_grade()
        bitcoin.display()

    # Close connection when done
    DatabaseConnection.getInstance().close()

=== Demonstrating Factory Pattern for Different Company Types ===

1. Creating a domestic company:
Company: Apple Inc. (AAPL) - Type: Domestic
Grade: B
Time Series Data:
         date   value
95 2023-04-06  283.86
96 2023-04-07  191.35
97 2023-04-08  478.15
98 2023-04-09  175.16
99 2023-04-10  483.56
Moving Average:
95    269.5135
96    270.0985
97    284.5885
98    284.4700
99    290.4855
Name: value, dtype: float64
High Bollinger Band:
95    467.315944
96    466.848313
97    497.971392
98    498.105707
99    519.670710
Name: value, dtype: float64
Low Bollinger Band:
95    71.711056
96    73.348687
97    71.205608
98    70.834293
99    61.300290
Name: value, dtype: float64

2. Creating a foreign company:
Company: Toyota Motor Corp (ID: 11) - Type: Foreign
Grade: B
Time Series Data:
         date   value
95 2023-04-06  218.07
96 2023-04-07  374.76
97 2023-04-08  279.42
98 2023-04-09  536.23
99 2023-04-10  351.37
Moving Average:
95    370.0010
96    371.5430
97    361.3355
98    361.604