In [0]:
"""
Key Library Explanations:

Data Processing and Analysis:
- pandas (as pd): Library for data manipulation and analysis, provides DataFrame structures
- numpy: Often used with pandas for numerical operations (though not directly imported in your code)

Financial Data:
- yfinance (as yf): Downloads financial market data from Yahoo Finance
                    Provides historical prices, company info, and financial statements
                    
Technical Analysis:
- ta: Technical analysis library with 130+ indicators including:
      - Momentum indicators (RSI, MACD)
      - Volume indicators
      - Volatility indicators (Bollinger Bands)
      - Trend indicators (Moving Averages)

Visualization:
- matplotlib.pyplot (as plt): Creating static, animated, and interactive visualizations

Spark and Big Data:
- pyspark.sql: Apache Spark's module for structured data processing
- SparkSession: Entry point for DataFrame and SQL functionality
- functions (as F): Spark SQL functions for data transformations
- Window (as W): For window-based operations like running totals
- StructType/Field: Define schema for Spark DataFrames

System and Utilities:
- subprocess: Running external commands and managing subprocesses
- sys: Python runtime environment access
- gc: Garbage collection for memory management
- warnings: Control warning messages
- time.sleep: Add delays in code execution

Email Functionality:
- smtplib: Send emails using SMTP protocol
- email.mime.text: Create and format email messages
"""

import subprocess
import warnings
import sys
import os

warnings.filterwarnings('ignore')

def install_quietly(packages):
    """Install packages quietly using pip."""
    for package in packages:
        try:
            with open(os.devnull, 'w') as devnull:
                subprocess.check_call(
                    [sys.executable, '-m', 'pip', 'install', '--quiet', package],
                    stdout=devnull,
                    stderr=subprocess.STDOUT
                )
        except subprocess.CalledProcessError as e:
            print(f"Error installing {package}: {str(e)}")

# Required packages
packages = [
    'pandas',
    'yfinance', 
    'ta',
    'matplotlib'
]

try:
    install_quietly(packages)
    dbutils.library.restartPython()
except Exception as e:
    print(f"Error: {str(e)}")



In [0]:
# Data Processing and Analysis
import pandas as pd                     # Data manipulation and analysis library
from pyspark.sql import SparkSession    # Entry point for Spark SQL and DataFrame
from pyspark.sql.types import *         # Spark SQL data types for schema definition
from pyspark.sql import functions as F   # Spark SQL functions for data transformations
from pyspark.sql import Window as W      # Window functions for operations like rolling calculations

# Financial Data and Technical Analysis
import yfinance as yf                   # Yahoo Finance API wrapper for market data
import ta                               # Technical analysis library for financial indicators

# Visualization
import matplotlib.pyplot as plt         # Data visualization library

# System Utilities
import gc                              # Garbage collector for memory management
import sys                             # System-specific parameters and functions
from time import sleep                 # Time-related functions, used for delays

# Email Communications
import smtplib                         # SMTP protocol client for sending emails
from email.mime.text import MIMEText   # MIME text formatting for email content

def send_message(subject="Email by Python", body="Default by Python"):
    """By Ricardo Kazuo"""
    # Read credentials and addresses from files
    with open('/Volumes/workspace/default/data/gmail.txt', 'r') as file:
        password = file.read().strip()
    
    with open('/Volumes/workspace/default/data/sender.txt', 'r') as file:
        sender_email = file.read().strip()
        
    with open('/Volumes/workspace/default/data/receiver.txt', 'r') as file:
        receiver_email = file.read().strip()
        
    message = MIMEText(body)
    message['to'] = receiver_email
    message['from'] = f"Databricks - Extractor<{sender_email}>"
    message['subject'] = subject
    server = smtplib.SMTP('smtp.gmail.com:587')
    server.ehlo_or_helo_if_needed()
    server.starttls()
    server.ehlo_or_helo_if_needed()
    server.login(sender_email, password)
    server.sendmail(f"Databricks - Extractor<{sender_email}>", receiver_email, message.as_string())
    server.quit()

def process_financial_data(df):
    """
    Process financial data and add technical indicators
    """
    # Convert required columns to numeric
    df['Close_Price'] = pd.to_numeric(df['Close'])
    df['Volume_Num'] = pd.to_numeric(df['Volume'])
    
    # Calculate Technical Indicators
    # Moving Averages
    df['SMA_20'] = df['Close_Price'].rolling(window=20).mean()
    df['EMA_20'] = df['Close_Price'].ewm(span=20, adjust=False).mean()
    
    # RSI
    df['RSI'] = ta.momentum.RSIIndicator(df['Close_Price']).rsi()
    
    # MACD
    macd = ta.trend.MACD(df['Close_Price'])
    df['MACD'] = macd.macd()
    df['MACD_Signal'] = macd.macd_signal()
    df['MACD_Histogram'] = macd.macd_diff()
    
    # Bollinger Bands
    bollinger = ta.volatility.BollingerBands(df['Close_Price'])
    df['BB_Upper'] = bollinger.bollinger_hband()
    df['BB_Lower'] = bollinger.bollinger_lband()
    df['BB_Middle'] = bollinger.bollinger_mavg()
    
    # Volume Indicators
    df['Volume_SMA_20'] = df['Volume_Num'].rolling(window=20).mean()
    
    # Price Momentum
    df['Price_Change'] = df['Close_Price'].pct_change()
    df['Price_Change_20D'] = df['Close_Price'].pct_change(periods=20)
    
    # Add time-based features
    df['Date_Time'] = pd.to_datetime(df['Date'])
    df['day_of_week'] = df['Date_Time'].dt.dayofweek
    df['month'] = df['Date_Time'].dt.month
    df['year'] = df['Date_Time'].dt.year
    df['is_month_end'] = df['Date_Time'].dt.is_month_end
    
    return df

# Create or get Spark session
spark = SparkSession.builder.getOrCreate()

# Drop the existing table if it exists
spark.sql("DROP TABLE IF EXISTS default.bronze_financial_stocks")
print("Dropped existing table if it existed")

# Define the specific tickers we want to process
ticker_info = [
    {"ticker": "AAPL", "company_name": "Apple Inc."},
    {"ticker": "MSFT", "company_name": "Microsoft Corporation"},
    {"ticker": "TSLA", "company_name": "Tesla, Inc."},
    {"ticker": "NVDA", "company_name": "NVIDIA Corporation"}
]

print(f"\nTotal stocks to process: {len(ticker_info)}")

# Send start notification
send_message(
    subject="Started Processing Selected Stocks",
    body="Beginning to fetch and process data for AAPL, MSFT, TSLA, and NVDA."
)

# Initialize an empty list for the data
financial_data = []

# Process each ticker
for info in ticker_info:
    ticker_symbol = info['ticker']
    try:
        print(f"\nProcessing {ticker_symbol}")
        # Get historical data
        ticker = yf.Ticker(ticker_symbol)
        hist_data = ticker.history(period="max")
        
        if hist_data.empty:
            print(f"No data found for {ticker_symbol}")
            continue
            
        # Reset index and add ticker column
        hist_data = hist_data.reset_index()
        hist_data['Ticker'] = ticker_symbol
        hist_data['Company_Name'] = info['company_name']
        hist_data['Index'] = 'N/A'
        
        # Process the data with technical indicators
        processed_data = process_financial_data(hist_data)
        
        # Append to data list
        financial_data.append(processed_data)
        print(f"Successfully processed {ticker_symbol} - {info['company_name']}")
            
    except Exception as e:
        print(f"Error processing {ticker_symbol}: {str(e)}")
        continue

# Process the collected data
if financial_data:
    try:
        combined_data = pd.concat(financial_data, ignore_index=True)
        
        # Convert to list of rows
        data_rows = combined_data.to_dict('records')
        
        # Define schema including technical indicators
        # Define schema including technical indicators
        schema = StructType([
            # Basic Stock Data (original fields from yfinance)
            StructField("Date", DateType(), True),           
            StructField("Open", DoubleType(), True),         
            StructField("High", DoubleType(), True),          
            StructField("Low", DoubleType(), True),           
            StructField("Close", DoubleType(), True),        
            StructField("Volume", DoubleType(), True),       
            StructField("Dividends", DoubleType(), True),    
            StructField("Stock_Splits", DoubleType(), True), 
            
            # Stock Identification (these were missing)
            StructField("Ticker", StringType(), True),       # Stock symbol
            StructField("Company_Name", StringType(), True), # Company name
            StructField("Index", StringType(), True),        # Index identifier
            
            # Your calculated fields (remain the same)
            StructField("Close_Price", DoubleType(), True),  
            StructField("Volume_Num", DoubleType(), True),   
            StructField("SMA_20", DoubleType(), True),      
            StructField("EMA_20", DoubleType(), True),      
            StructField("RSI", DoubleType(), True),         
            StructField("MACD", DoubleType(), True),        
            StructField("MACD_Signal", DoubleType(), True), 
            StructField("MACD_Histogram", DoubleType(), True), 
            StructField("BB_Upper", DoubleType(), True),    
            StructField("BB_Lower", DoubleType(), True),    
            StructField("BB_Middle", DoubleType(), True),   
            StructField("Volume_SMA_20", DoubleType(), True), 
            StructField("Price_Change", DoubleType(), True),    
            StructField("Price_Change_20D", DoubleType(), True), 
            StructField("Date_Time", TimestampType(), True),    
            StructField("day_of_week", IntegerType(), True),    
            StructField("month", IntegerType(), True),          
            StructField("year", IntegerType(), True),           
            StructField("is_month_end", BooleanType(), True)    
        ])
        
        # Create Spark DataFrame
        spark_df = spark.createDataFrame(data_rows, schema)
        
        # Write to Delta table
        spark_df.write \
            .format("delta") \
            .mode("overwrite") \
            .saveAsTable("default.bronze_financial_stocks")
        
        print("\nSuccessfully wrote data to bronze table")
        
        # Clear data
        del combined_data, data_rows, spark_df
        gc.collect()
        
    except Exception as e:
        print(f"Error processing data: {str(e)}")
else:
    print("No data was collected")

# Final verification and get row counts
row_counts_df = spark.sql("""
    SELECT 
        Ticker,
        Company_Name,
        Count(*) as record_count,
        COUNT(SMA_20) as rows_with_sma,
        COUNT(RSI) as rows_with_rsi,
        MIN(Date_Time) as earliest_date,
        MAX(Date_Time) as latest_date
    FROM default.bronze_financial_stocks 
    GROUP BY Ticker, Company_Name
    ORDER BY Ticker
""")

# Show the results
print("\nVerifying final data - showing stocks and their record counts:")
row_counts_df.show(truncate=False)

# Create the completion message with detailed counts
completion_message = "Successfully completed processing stock data.\n\nData summary by ticker:\n"
for row in row_counts_df.collect():
    completion_message += f"\n{row.Ticker} ({row.Company_Name}):\n"
    completion_message += f"- Total records: {row.record_count}\n"
    completion_message += f"- Records with technical indicators: {row.rows_with_sma}\n"
    completion_message += f"- Date range: {row.earliest_date} to {row.latest_date}\n"

# Send completion notification
send_message(
    subject="Stock Data Processing Complete",
    body=completion_message
)

print("\nScript completed successfully!")

Dropped existing table if it existed

Total stocks to process: 4

Processing AAPL
Successfully processed AAPL - Apple Inc.

Processing MSFT
Successfully processed MSFT - Microsoft Corporation

Processing TSLA
Successfully processed TSLA - Tesla, Inc.

Processing NVDA
Successfully processed NVDA - NVIDIA Corporation

Successfully wrote data to bronze table

Verifying final data - showing stocks and their record counts:
+------+---------------------+------------+-------------+-------------+-------------------+-------------------+
|Ticker|Company_Name         |record_count|rows_with_sma|rows_with_rsi|earliest_date      |latest_date        |
+------+---------------------+------------+-------------+-------------+-------------------+-------------------+
|AAPL  |Apple Inc.           |11097       |11097        |11097        |1980-12-12 05:00:00|2024-12-18 05:00:00|
|MSFT  |Microsoft Corporation|9771        |9771         |9771         |1986-03-13 05:00:00|2024-12-18 05:00:00|
|NVDA  |NVIDIA Cor

In [0]:
# Gold Layer Processing - Main Objectives:
# 1. Analyze stock performance and market trends
# 2. Generate trading signals based on technical indicators
# 3. Identify market extremes and stock-specific patterns
# 4. Provide comprehensive market analysis for decision making
print("\nStarting gold layer processing...")

def gold_layer_processing():
    """
    Analyzes relationships and patterns in financial stock data by:
    1. Identifying monthly high/low points for market trend analysis
    2. Calculating technical indicators for trading signals
    3. Generating buy/sell signals based on RSI and Bollinger Bands
    4. Computing stock-specific and market-wide statistics for comparison
    """
    try:
        # Drop existing gold table before processing
        print("Dropping existing gold table if it exists...")
        spark.sql("DROP TABLE IF EXISTS default.gold_financial_stocks")
        
        # Create a temporary view of the bronze table
        print("Creating temporary view of bronze data...")
        spark.sql("CREATE OR REPLACE TEMPORARY VIEW temp_bronze_stocks AS SELECT * FROM default.bronze_financial_stocks")
        
        # Main analysis query
        query = """
        -- RankedStocks CTE
        -- Objective: Identify monthly price extremes for each stock
        -- Purpose: Used to understand market cycles and monthly trading ranges
        WITH RankedStocks AS (
            SELECT 
                CAST(Date_Time AS DATE) as Date,
                Ticker,
                Company_Name,
                Close_Price,
                Volume_Num,
                SMA_20,
                RSI,
                -- Rank prices to find monthly highs
                ROW_NUMBER() OVER(
                    PARTITION BY DATE_TRUNC('month', CAST(Date_Time AS DATE)) 
                    ORDER BY Close_Price DESC
                ) as RowNumHigh,
                -- Rank prices to find monthly lows
                ROW_NUMBER() OVER(
                    PARTITION BY DATE_TRUNC('month', CAST(Date_Time AS DATE)) 
                    ORDER BY Close_Price ASC
                ) as RowNumLow
            FROM temp_bronze_stocks
            WHERE Date_Time >= '2020-01-01'
        ),
        
        -- ExtremePoints CTE
        -- Objective: Extract only the highest and lowest prices of each month
        -- Purpose: Used for trend analysis and support/resistance levels
        ExtremePoints AS (
            SELECT *
            FROM RankedStocks
            WHERE RowNumHigh = 1 OR RowNumLow = 1
        ),
        
        -- TechStocks CTE
        -- Objective: Gather all technical indicators for analysis
        -- Purpose: Combine multiple indicators for stronger trading signals
        TechStocks AS (
            SELECT 
                CAST(Date_Time AS DATE) as Date,
                Ticker,
                Company_Name,
                Close_Price,
                Volume_Num,
                SMA_20,        -- Trend direction
                RSI,           -- Overbought/Oversold conditions
                MACD,          -- Momentum and trend changes
                BB_Upper,      -- Volatility upper bound
                BB_Lower       -- Volatility lower bound
            FROM temp_bronze_stocks
            WHERE Date_Time >= '2020-01-01'
        ),
        
        -- JoinedData CTE
        -- Objective: Combine technical data with market extremes
        -- Purpose: Compare individual stock performance against market conditions
        JoinedData AS (
            SELECT 
                ts.Date,
                ts.Ticker,
                ts.Company_Name,
                ts.Close_Price as StockClose,
                ts.Volume_Num as StockVolume,
                ts.SMA_20,
                ts.RSI,
                ts.MACD,
                ts.BB_Upper,
                ts.BB_Lower,
                ep.Close_Price as BenchmarkClose,
                CASE 
                    WHEN ep.RowNumHigh = 1 THEN 'High'
                    WHEN ep.RowNumLow = 1 THEN 'Low'
                END as MarketExtreme
            FROM TechStocks ts
            JOIN ExtremePoints ep 
            ON DATE_TRUNC('month', ts.Date) = DATE_TRUNC('month', ep.Date)
        ),
        
        -- AggregatedData CTE
        -- Objective: Calculate comparative statistics and averages
        -- Purpose: Provide context for current price levels and volume
        AggregatedData AS (
            SELECT 
                Date,
                Ticker,
                Company_Name,
                StockClose,
                StockVolume,
                SMA_20,
                RSI,
                MACD,
                BB_Upper,
                BB_Lower,
                BenchmarkClose,
                MarketExtreme,
                -- Calculate averages for comparison
                AVG(StockClose) OVER (PARTITION BY Ticker) as AvgStockClose,     -- Stock's historical average
                AVG(StockVolume) OVER (PARTITION BY Ticker) as AvgStockVolume,   -- Normal trading volume
                AVG(StockClose) OVER () as AvgMarketClose                        -- Market average
            FROM JoinedData
        )
        
        -- Final SELECT: Generate actionable insights and signals
        -- Objective: Produce final analysis with trading signals and market context
        SELECT 
            Date,
            Ticker,
            Company_Name,
            StockClose,
            StockVolume,
            SMA_20,
            RSI,
            MACD,
            BB_Upper,
            BB_Lower,
            BenchmarkClose,
            MarketExtreme,
            AvgStockClose,
            AvgStockVolume,
            AvgMarketClose,
            -- Deviation calculations for trend strength
            (StockClose - AvgStockClose) / NULLIF(AvgStockClose, 0) * 100 as StockDeviation,
            (StockClose - AvgMarketClose) / NULLIF(AvgMarketClose, 0) * 100 as MarketDeviation,
            -- RSI-based trading signals
            CASE 
                WHEN RSI > 70 THEN 'Overbought'    -- Potential sell signal
                WHEN RSI < 30 THEN 'Oversold'      -- Potential buy signal
                ELSE 'Neutral'
            END as RSI_Signal,
            -- Bollinger Bands trading signals
            CASE
                WHEN StockClose > BB_Upper THEN 'Above Upper Band'   -- Price above normal range
                WHEN StockClose < BB_Lower THEN 'Below Lower Band'   -- Price below normal range
                ELSE 'Within Bands'                                  -- Price within normal range
            END as BB_Signal
        FROM AggregatedData
        ORDER BY Date, Ticker
        """
        
        print("Executing gold layer analysis...")
        # Execute the analysis query and save results to gold table
        spark.sql(query).write \
            .format("delta") \
            .mode("overwrite") \
            .saveAsTable("default.gold_financial_stocks")
        
        print("Gold layer table created successfully")
        
        # Clean up temporary view
        spark.sql("DROP VIEW IF EXISTS temp_bronze_stocks")
        
        # Generate summary statistics
        print("Generating summary statistics...")
        return spark.sql("""
            SELECT 
                Ticker,
                Company_Name,
                COUNT(*) as record_count,
                MIN(Date) as earliest_date,
                MAX(Date) as latest_date,
                AVG(StockDeviation) as avg_stock_deviation,
                AVG(MarketDeviation) as avg_market_deviation,
                COUNT(CASE WHEN RSI_Signal = 'Overbought' THEN 1 END) as overbought_count,
                COUNT(CASE WHEN RSI_Signal = 'Oversold' THEN 1 END) as oversold_count,
                COUNT(CASE WHEN BB_Signal != 'Within Bands' THEN 1 END) as volatility_events
            FROM default.gold_financial_stocks
            GROUP BY Ticker, Company_Name
            ORDER BY Ticker
        """)
        
    except Exception as e:
        print(f"Error in gold layer processing: {str(e)}")
        raise

try:
    # Execute gold layer processing and get summary
    gold_summary = gold_layer_processing()
    print("\nGold layer processing summary:")
    gold_summary.show(truncate=False)

    # Update completion message with gold layer results
    completion_message = "Gold layer analysis completed with the following insights:"
    for row in gold_summary.collect():
        completion_message += f"\n\n{row.Ticker} ({row.Company_Name}):\n"
        completion_message += f"- Total analyzed periods: {row.record_count}\n"
        completion_message += f"- Date range: {row.earliest_date} to {row.latest_date}\n"
        completion_message += f"- Average deviation from stock mean: {row.avg_stock_deviation:.2f}%\n"
        completion_message += f"- Average deviation from market mean: {row.avg_market_deviation:.2f}%\n"
        completion_message += f"- Overbought periods: {row.overbought_count}\n"
        completion_message += f"- Oversold periods: {row.oversold_count}\n"
        completion_message += f"- Volatility events: {row.volatility_events}"

    # Send the completion notification
    send_message(
        subject="Gold Layer Processing Complete",
        body=completion_message
    )

except Exception as e:
    error_message = f"Error in gold layer processing and reporting: {str(e)}"
    print(error_message)
    send_message(
        subject="Gold Layer Processing Error",
        body=error_message
    )

print("\nGold layer processing completed!")


Starting gold layer processing...
Dropping existing gold table if it exists...
Creating temporary view of bronze data...
Executing gold layer analysis...
Gold layer table created successfully
Generating summary statistics...

Gold layer processing summary:
+------+---------------------+------------+-------------+-----------+----------------------+--------------------+----------------+--------------+-----------------+
|Ticker|Company_Name         |record_count|earliest_date|latest_date|avg_stock_deviation   |avg_market_deviation|overbought_count|oversold_count|volatility_events|
+------+---------------------+------------+-------------+-----------+----------------------+--------------------+----------------+--------------+-----------------+
|AAPL  |Apple Inc.           |2500        |2020-01-02   |2024-12-18 |-4.200728653813712E-15|-12.028627491987008 |300             |34            |288              |
|MSFT  |Microsoft Corporation|2500        |2020-01-02   |2024-12-18 |-3.29123395204078

In [0]:
def gold_layer_investment_growth():
    """
    Calculates the investment growth for each stock by analyzing daily returns.
    This analysis helps understand:
    1. Long-term investment performance
    2. Compound returns over time
    3. Comparative stock performance
    4. Investment value evolution
    """
    try:
        # Drop existing gold table if it exists
        print("Dropping existing investment growth table if it exists...")
        spark.sql("DROP TABLE IF EXISTS default.gold_investment_growth")
        
        # Create a temporary view
        print("Creating temporary view of bronze data...")
        spark.sql("CREATE OR REPLACE TEMPORARY VIEW temp_bronze_stocks AS SELECT * FROM default.bronze_financial_stocks")
        
        # Investment growth analysis query
        query = """
        -- Base data CTE
        -- Objective: Filter and prepare the initial dataset with proper date formatting
        WITH BaseData AS (
            SELECT 
                CAST(Date_Time AS DATE) as Date,
                Ticker,
                Company_Name,
                Close_Price as Close
            FROM temp_bronze_stocks
            WHERE Date_Time >= '2020-01-01'
            ORDER BY Ticker, Date
        ),
        
        -- DailyReturns CTE
        -- Objective: Calculate day-over-day returns for each stock
        -- Purpose: Measure daily price changes as percentage returns
        DailyReturns AS (
            SELECT
                Date,
                Ticker,
                Company_Name,
                Close,
                -- Calculate daily return using current and previous day's price
                (Close / LAG(Close, 1) OVER (PARTITION BY Ticker ORDER BY Date)) - 1 as Daily_Return
            FROM BaseData
        ),
        
        -- CumulativeReturns CTE
        -- Objective: Calculate cumulative investment growth
        -- Purpose: Track the value of $1 invested at the start
        CumulativeReturns AS (
            SELECT
                Date,
                Ticker,
                Company_Name,
                Close,
                Daily_Return,
                -- Calculate cumulative value using product of (1 + daily returns)
                EXP(SUM(LN(1 + COALESCE(Daily_Return, 0))) OVER (PARTITION BY Ticker ORDER BY Date)) 
                as Investment_Value
            FROM DailyReturns
        ),
        
        -- FinalValues CTE
        -- Objective: Get the latest investment value for each stock
        -- Purpose: Determine final investment performance
        FinalValues AS (
            SELECT
                Ticker,
                Company_Name,
                FIRST_VALUE(Date) OVER (PARTITION BY Ticker ORDER BY Date) as Start_Date,
                LAST_VALUE(Date) OVER (PARTITION BY Ticker ORDER BY Date) as End_Date,
                LAST_VALUE(Investment_Value) OVER (PARTITION BY Ticker ORDER BY Date) as Final_Value,
                MIN(Investment_Value) OVER (PARTITION BY Ticker) as Lowest_Value,
                MAX(Investment_Value) OVER (PARTITION BY Ticker) as Highest_Value,
                AVG(Daily_Return) OVER (PARTITION BY Ticker) * 252 as Annualized_Return, -- 252 trading days
                STDDEV(Daily_Return) OVER (PARTITION BY Ticker) * SQRT(252) as Annualized_Volatility
            FROM CumulativeReturns
        )
        
        -- Final SELECT: Generate comprehensive investment analysis
        -- Objective: Provide complete investment performance metrics
        SELECT DISTINCT
            Ticker,
            Company_Name,
            Start_Date,
            End_Date,
            ROUND(Final_Value, 2) as Final_Investment_Value,
            ROUND(Lowest_Value, 2) as Minimum_Investment_Value,
            ROUND(Highest_Value, 2) as Maximum_Investment_Value,
            ROUND(Annualized_Return * 100, 2) as Annualized_Return_Pct,
            ROUND(Annualized_Volatility * 100, 2) as Annualized_Volatility_Pct,
            ROUND((Final_Value - 1) * 100, 2) as Total_Return_Pct,
            ROUND((Highest_Value / Lowest_Value - 1) * 100, 2) as Max_Drawdown_Pct
        FROM FinalValues
        ORDER BY Final_Investment_Value DESC
        """
        
        print("Executing investment growth analysis...")
        # Execute analysis and save to gold table
        spark.sql(query).write \
            .format("delta") \
            .mode("overwrite") \
            .saveAsTable("default.gold_investment_growth")
            
        print("Investment growth table created successfully")
        
        # Clean up temporary view
        spark.sql("DROP VIEW IF EXISTS temp_bronze_stocks")
        
        # Generate summary statistics
        print("Generating investment summary...")
        return spark.sql("""
            SELECT 
                Ticker,
                Company_Name,
                Final_Investment_Value,
                Total_Return_Pct,
                Annualized_Return_Pct,
                Annualized_Volatility_Pct,
                Max_Drawdown_Pct,
                Start_Date,
                End_Date
            FROM default.gold_investment_growth
            ORDER BY Final_Investment_Value DESC
        """)
        
    except Exception as e:
        print(f"Error in investment growth analysis: {str(e)}")
        raise

try:
    # Execute gold layer processing and get summary
    print("\nStarting gold layer processing...")
    gold_summary = gold_layer_processing()
    print("\nGold layer processing summary:")
    gold_summary.show(truncate=False)

    # Create comprehensive message with explanations
    completion_message = "Gold Layer Analysis Completed\n\n"
    completion_message += "METRIC EXPLANATIONS:\n"
    completion_message += "1. Average Deviation from Stock Mean:\n"
    completion_message += "   • Shows how far the stock's price deviates from its own historical average\n"
    completion_message += "   • Values close to 0% suggest stable trading around historical average\n"
    completion_message += "   • Larger deviations indicate significant price movements\n\n"
    completion_message += "2. Average Deviation from Market Mean:\n"
    completion_message += "   • Compares stock's price to the average of all analyzed stocks\n"
    completion_message += "   • Negative values indicate trading below market average\n"
    completion_message += "   • Positive values indicate trading above market average\n\n"
    completion_message += "3. Overbought/Oversold Periods:\n"
    completion_message += "   • Based on RSI (Relative Strength Index)\n"
    completion_message += "   • Overbought (RSI > 70): Indicates strong upward momentum\n"
    completion_message += "   • Oversold (RSI < 30): Indicates potential buying opportunities\n\n"
    completion_message += "4. Volatility Events:\n"
    completion_message += "   • Times price moved outside Bollinger Bands\n"
    completion_message += "   • Higher numbers indicate more volatile price action\n\n"
    completion_message += "DETAILED ANALYSIS BY STOCK:\n"

    for row in gold_summary.collect():
        completion_message += f"\n{row.Ticker} ({row.Company_Name}):\n"
        completion_message += f"- Total analyzed periods: {row.record_count}\n"
        completion_message += f"- Date range: {row.earliest_date} to {row.latest_date}\n"
        completion_message += f"- Average deviation from stock mean: {row.avg_stock_deviation:.2f}%\n"
        completion_message += f"- Average deviation from market mean: {row.avg_market_deviation:.2f}%\n"
        completion_message += f"- Overbought periods: {row.overbought_count}\n"
        completion_message += f"- Oversold periods: {row.oversold_count}\n"
        completion_message += f"- Volatility events: {row.volatility_events}\n"

        # Add stock-specific interpretation
        completion_message += "\nKey Insights:\n"
        
        # Price stability interpretation
        if abs(row.avg_stock_deviation) < 5:
            completion_message += "• Stock shows stable price behavior around its average\n"
        else:
            completion_message += "• Stock shows notable price movements from its average\n"
        
        # Market comparison interpretation
        if row.avg_market_deviation > 0:
            completion_message += f"• Trades {abs(row.avg_market_deviation):.1f}% above market average\n"
        else:
            completion_message += f"• Trades {abs(row.avg_market_deviation):.1f}% below market average\n"
        
        # Momentum interpretation
        if row.overbought_count > row.oversold_count * 2:
            completion_message += "• Strong upward momentum dominates\n"
        elif row.oversold_count > row.overbought_count * 2:
            completion_message += "• Shows significant downward pressure\n"
        
        # Volatility interpretation
        if row.volatility_events > 300:
            completion_message += "• Exhibits high volatility, requires careful monitoring\n"
        else:
            completion_message += "• Shows manageable volatility levels\n"

    # Send completion notification
    send_message(
        subject="Gold Layer Analysis Complete",
        body=completion_message
    )

except Exception as e:
    error_message = f"Error in gold layer processing and reporting: {str(e)}"
    print(error_message)
    send_message(
        subject="Gold Layer Analysis Error",
        body=error_message
    )

print("\nGold layer processing completed!")


Starting gold layer processing...
Dropping existing gold table if it exists...
Creating temporary view of bronze data...
Executing gold layer analysis...
Gold layer table created successfully
Generating summary statistics...

Gold layer processing summary:
+------+---------------------+------------+-------------+-----------+----------------------+--------------------+----------------+--------------+-----------------+
|Ticker|Company_Name         |record_count|earliest_date|latest_date|avg_stock_deviation   |avg_market_deviation|overbought_count|oversold_count|volatility_events|
+------+---------------------+------------+-------------+-----------+----------------------+--------------------+----------------+--------------+-----------------+
|AAPL  |Apple Inc.           |2500        |2020-01-02   |2024-12-18 |-4.200728653813712E-15|-12.028627491987008 |300             |34            |288              |
|MSFT  |Microsoft Corporation|2500        |2020-01-02   |2024-12-18 |-3.29123395204078

In [0]:
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
from email.mime.text import MIMEText
import io
import matplotlib
matplotlib.use('Agg')  # Use Agg backend
import matplotlib.pyplot as plt

def send_message_with_images(subject="Email by Python", body="Default by Python", figures=None):
    """
    Enhanced version of send_message that handles matplotlib figures directly
    figures: list of tuples (figure, name)
    """
    # Read credentials
    with open('/Volumes/workspace/default/data/gmail.txt', 'r') as file:
        password = file.read().strip()
    
    with open('/Volumes/workspace/default/data/sender.txt', 'r') as file:
        sender_email = file.read().strip()
        
    with open('/Volumes/workspace/default/data/receiver.txt', 'r') as file:
        receiver_email = file.read().strip()
    
    # Create message container
    message = MIMEMultipart('related')
    message['Subject'] = subject
    message['From'] = f"Databricks - Extractor<{sender_email}>"
    message['To'] = receiver_email
    
    # Create the HTML body
    html = f"""
    <html>
      <body>
        <h2>Technical Analysis Report</h2>
        
        <h3>Market Overview:</h3>
        <p>- The market deviation chart shows the relative performance of each stock compared to the market average.</p>
        <p>- Positive deviations indicate outperformance, negative deviations indicate underperformance.</p>
        
        <h3>Individual Stock Analysis:</h3>
        <p>Each technical dashboard shows:</p>
        <ol>
            <li>Price movement with Bollinger Bands</li>
            <li>RSI indicator (overbought >70, oversold <30)</li>
            <li>MACD trend indicator</li>
        </ol>
        
        <h3>Charts:</h3>
        {body}
      </body>
    </html>
    """
    
    # Record the MIME types of text/html and image
    html_part = MIMEText(html, 'html')
    message.attach(html_part)
    
    # Attach figures
    if figures:
        for i, (fig, name) in enumerate(figures):
            # Save figure to bytes buffer
            buf = io.BytesIO()
            fig.savefig(buf, format='png', dpi=300, bbox_inches='tight')
            buf.seek(0)
            img_data = buf.getvalue()
            
            image = MIMEImage(img_data)
            image.add_header('Content-ID', f'<image{i}>')
            image.add_header('Content-Disposition', 'inline', filename=name)
            message.attach(image)
            buf.close()
    
    # Send email
    server = smtplib.SMTP('smtp.gmail.com:587')
    server.ehlo_or_helo_if_needed()
    server.starttls()
    server.ehlo_or_helo_if_needed()
    server.login(sender_email, password)
    server.sendmail(f"Databricks - Extractor<{sender_email}>", receiver_email, message.as_string())
    server.quit()

def create_and_send_visualizations():
    """Create visualizations, display them, and send via email"""
    # Get the data from gold table
    gold_df = spark.sql("""
        SELECT *
        FROM default.gold_financial_stocks
        ORDER BY Date, Ticker
    """).toPandas()
    
    figures = []
    html_images = []
    plt.style.use('seaborn')
    
    print("Generating and displaying visualizations...")
    
    # 1. Market Deviation Analysis
    fig1 = plt.figure(figsize=(12, 6))
    for ticker in gold_df['Ticker'].unique():
        stock_data = gold_df[gold_df['Ticker'] == ticker]
        plt.plot(stock_data['Date'], stock_data['MarketDeviation'], label=ticker)
    
    plt.title('Stock Deviations from Market Average')
    plt.xlabel('Date')
    plt.ylabel('Deviation (%)')
    plt.legend()
    plt.grid(True)
    
    # Display the plot
    plt.show()
    
    figures.append((fig1, 'market_deviation.png'))
    html_images.append('<img src="cid:image0" width="800px"><br><br>')
    
    # 2. Technical Analysis Dashboard for each stock
    for i, ticker in enumerate(gold_df['Ticker'].unique(), 1):
        print(f"\nGenerating dashboard for {ticker}...")
        stock_data = gold_df[gold_df['Ticker'] == ticker].copy()
        
        fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 12), height_ratios=[2, 1, 1])
        fig.suptitle(f'Technical Analysis Dashboard - {ticker}', fontsize=16)
        
        # Price and Bollinger Bands
        ax1.plot(stock_data['Date'], stock_data['StockClose'], label='Price', color='blue')
        ax1.plot(stock_data['Date'], stock_data['BB_Upper'], '--', label='Upper BB', color='gray', alpha=0.7)
        ax1.plot(stock_data['Date'], stock_data['BB_Lower'], '--', label='Lower BB', color='gray', alpha=0.7)
        ax1.fill_between(stock_data['Date'], stock_data['BB_Upper'], stock_data['BB_Lower'], alpha=0.1)
        ax1.set_title('Price and Bollinger Bands')
        ax1.legend()
        
        # RSI
        ax2.plot(stock_data['Date'], stock_data['RSI'], color='purple')
        ax2.axhline(y=70, color='r', linestyle='--', alpha=0.5)
        ax2.axhline(y=30, color='g', linestyle='--', alpha=0.5)
        ax2.set_title('RSI')
        ax2.set_ylim(0, 100)
        
        # MACD
        ax3.plot(stock_data['Date'], stock_data['MACD'], label='MACD', color='blue')
        ax3.axhline(y=0, color='k', linestyle='--', alpha=0.3)
        ax3.set_title('MACD')
        ax3.legend()
        
        plt.tight_layout()
        
        # Display the plot
        plt.show()
        
        figures.append((fig, f'technical_dashboard_{ticker}.png'))
        html_images.append(f'<img src="cid:image{i}" width="800px"><br><br>')

    # Send email with images
    print("\nSending email with visualizations...")
    send_message_with_images(
        subject="Stock Market Technical Analysis Charts",
        body=''.join(html_images),
        figures=figures
    )
    
    # Clean up
    for fig, _ in figures:
        plt.close(fig)
    
    print("Visualizations have been generated, displayed, and sent via email!")

# Execute the function
create_and_send_visualizations()

  plt.style.use('seaborn')


Generating and displaying visualizations...

Generating dashboard for AAPL...

Generating dashboard for MSFT...

Generating dashboard for NVDA...

Generating dashboard for TSLA...

Sending email with visualizations...
Visualizations have been generated, displayed, and sent via email!


In [0]:
import smtplib, time, datetime, inspect, os
from time import sleep
from email.mime.text import MIMEText

def send_message(subject="Email by Python", body="Default by Python"):
    """By Ricardo Kazuo"""
    # Read credentials and addresses from files
    with open('/Volumes/workspace/default/data/gmail.txt', 'r') as file:
        password = file.read().strip()
    
    with open('/Volumes/workspace/default/data/sender.txt', 'r') as file:
        sender_email = file.read().strip()
        
    with open('/Volumes/workspace/default/data/receiver.txt', 'r') as file:
        receiver_email = file.read().strip()
        
    message = MIMEText(body)
    message['to'] = receiver_email
    message['from'] = f"Databricks - Extractor<{sender_email}>"
    message['subject'] = subject
    server = smtplib.SMTP('smtp.gmail.com:587')
    server.ehlo_or_helo_if_needed()
    server.starttls()
    server.ehlo_or_helo_if_needed()
    server.login(sender_email, password)
    server.sendmail(f"Databricks - Extractor<{sender_email}>", receiver_email, message.as_string())
    server.quit()

# Start timing and send start notification
start_time = time.time()

# Main data processing code
df = spark.sql("""
SELECT 
    a.*
FROM workspace.default.gold_financial_stocks a
""")

# Get data profile before writing
num_rows = df.count()
num_columns = len(df.columns)
estimated_size_bytes = num_rows * num_columns * 10
estimated_size_mb = round(estimated_size_bytes / (1024 * 1024), 2)
estimated_compressed_size_mb = round(estimated_size_mb * 0.15, 2)

# Generate timestamp for the file name
current_date = datetime.datetime.now().strftime('%Y%m%d')
base_path = f"/Volumes/workspace/default/data/{current_date}.csv.gz"

# Write with compression
df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .option("compression", "gzip") \
    .mode("overwrite") \
    .save(base_path)

# Get the actual file path including the part file name
output_files = dbutils.fs.ls(base_path)
part_file_path = [f for f in output_files if f.name.startswith('part-')][0].path

# Clean up the path (remove 'dbfs:' prefix if present)
if part_file_path.startswith('dbfs:'):
    part_file_path = part_file_path[5:]

# End timing and send completion notification with data profile
end_time = time.time()
formatted_end_time = datetime.datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')
execution_time = round(end_time - start_time, 2)

# Get the workspace ID and construct the download URL
workspace_id = "913057a4-85b7"
download_url = f"https://dbc-{workspace_id}.cloud.databricks.com/ajax-api/2.0/fs/files{part_file_path}"

report = f"""
Data Export Summary:
-------------------
Number of Rows: {num_rows:,}
Number of Columns: {num_columns}
Compression Ratio: {round(estimated_size_mb/estimated_compressed_size_mb, 2)}:1
Execution Time: {execution_time} seconds
Location: {part_file_path}

Download Link:
{download_url}
"""

send_message(
    subject=f"{inspect.currentframe().f_code.co_name} END: {formatted_end_time} (Execution time: {execution_time} seconds)",
    body=report
)

In [0]:
import smtplib, time, datetime, inspect, os
from time import sleep
from email.mime.text import MIMEText

def send_message(subject="Email by Python", body="Default by Python"):
    """By Ricardo Kazuo"""
    # Read credentials and addresses from files
    with open('/Volumes/workspace/default/data/gmail.txt', 'r') as file:
        password = file.read().strip()
    
    with open('/Volumes/workspace/default/data/sender.txt', 'r') as file:
        sender_email = file.read().strip()
        
    with open('/Volumes/workspace/default/data/receiver.txt', 'r') as file:
        receiver_email = file.read().strip()
        
    message = MIMEText(body)
    message['to'] = receiver_email
    message['from'] = f"Databricks - Extractor<{sender_email}>"
    message['subject'] = subject
    server = smtplib.SMTP('smtp.gmail.com:587')
    server.ehlo_or_helo_if_needed()
    server.starttls()
    server.ehlo_or_helo_if_needed()
    server.login(sender_email, password)
    server.sendmail(f"Databricks - Extractor<{sender_email}>", receiver_email, message.as_string())
    server.quit()

# Start timing and send start notification
start_time = time.time()

# Main data processing code
df = spark.sql("""
SELECT 
    a.*
FROM workspace.default.gold_financial_stocks a 
""")

# Get data profile before writing
num_rows = df.count()
num_columns = len(df.columns)
estimated_size_bytes = num_rows * num_columns * 10
estimated_size_mb = round(estimated_size_bytes / (1024 * 1024), 2)
estimated_compressed_size_mb = round(estimated_size_mb * 0.15, 2)

# Generate timestamp for the file name
current_date = datetime.datetime.now().strftime('%Y%m%d')
base_path = f"/Volumes/workspace/default/data2/{current_date}.csv.gz"

# Write with compression
df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .option("compression", "gzip") \
    .mode("overwrite") \
    .save(base_path)

# Get the actual file path including the part file name
output_files = dbutils.fs.ls(base_path)
part_file_path = [f for f in output_files if f.name.startswith('part-')][0].path

# Clean up the path (remove 'dbfs:' prefix if present)
if part_file_path.startswith('dbfs:'):
    part_file_path = part_file_path[5:]

# End timing and send completion notification with data profile
end_time = time.time()
formatted_end_time = datetime.datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')
execution_time = round(end_time - start_time, 2)

# Get the workspace ID and construct the download URL
workspace_id = "913057a4-85b7"
download_url = f"https://dbc-{workspace_id}.cloud.databricks.com/ajax-api/2.0/fs/files{part_file_path}"

report = f"""
Data Export Summary:
-------------------
Number of Rows: {num_rows:,}
Number of Columns: {num_columns}
Compression Ratio: {round(estimated_size_mb/estimated_compressed_size_mb, 2)}:1
Execution Time: {execution_time} seconds
Location: {part_file_path}

Download Link:
{download_url}
"""

send_message(
    subject=f"{inspect.currentframe().f_code.co_name} END: {formatted_end_time} (Execution time: {execution_time} seconds)",
    body=report
)