# Bronze Layer:
## Raw Data Collection and Preparation

This section focuses on collecting and preparing raw financial data from the
GPW (Warsaw Stock Exchange) archive. Two datasets are ingested:

- Stocks: Equity market data.
- Bonds: Fixed-income securities data.

The data is loaded from CSV files, standardized by renaming columns to English,
parsed for dates, and tagged with a Type column to differentiate between
stocks and bonds. The processed data is saved in Parquet format for
efficient storage and querying in subsequent layers.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    IntegerType,
    LongType,
)

# Initialize Spark session
spark = SparkSession.builder.appName("GPW Market Analysis").getOrCreate()

# Define schema for the data
base_schema = StructType(
    [
        StructField("Data", StringType(), True),
        StructField("Nazwa", StringType(), True),
        StructField("ISIN", StringType(), True),
        StructField("Waluta", StringType(), True),
        StructField("Kurs_otwarcia", DoubleType(), True),
        StructField("Kurs_max", DoubleType(), True),
        StructField("Kurs_min", DoubleType(), True),
        StructField("Kurs_zamkniecia", DoubleType(), True),
        StructField("Zmiana", DoubleType(), True),
        StructField("Wolumen", LongType(), True),
        StructField("Liczba_Transakcji", IntegerType(), True),
        StructField("Obrot", DoubleType(), True),
        StructField("Liczba_otwartych_pozycji", IntegerType(), True),
        StructField("Wartosc_otwartych_pozycji", DoubleType(), True),
        StructField("Cena_nominalna", DoubleType(), True),
        StructField("Date", StringType(), True),
    ]
)

In [None]:
from pyspark.sql.functions import col, to_date, lit

# Load stocks data
stocks = spark.read.csv("work/data/gpw_stocks.csv", header=True, schema=base_schema)

# Rename columns to English
column_mapping = {
    "Data": "DatePL",
    "Nazwa": "Name",
    "ISIN": "ISIN",
    "Waluta": "Currency",
    "Kurs_otwarcia": "Open",
    "Kurs_max": "High",
    "Kurs_min": "Low",
    "Kurs_zamkniecia": "Close",
    "Zmiana": "Change",
    "Wolumen": "Volume",
    "Liczba_Transakcji": "Transactions",
    "Obrot": "Turnover",
    "Liczba_otwartych_pozycji": "OpenPositionsCount",
    "Wartosc_otwartych_pozycji": "OpenPositionsValue",
    "Cena_nominalna": "NominalPrice",
    "Date": "DateEN",
}

for polish, english in column_mapping.items():
    stocks = stocks.withColumnRenamed(polish, english)

# Parse dates
stocks = stocks.withColumn("DatePL", to_date(col("DatePL"), "yyyy-MM-dd")).withColumn(
    "DateEN", to_date(col("DateEN"), "dd-MM-yyyy")
)

# Add Type column
stocks = stocks.withColumn("Type", lit("stock"))

In [None]:
# Load bonds data
bonds = spark.read.csv("work/data/gpw_bonds.csv", header=True, schema=base_schema)

# Rename columns to English
for polish, english in column_mapping.items():
    bonds = bonds.withColumnRenamed(polish, english)

# Parse dates
bonds = bonds.withColumn("DatePL", to_date(col("DatePL"), "yyyy-MM-dd")).withColumn(
    "DateEN", to_date(col("DateEN"), "dd-MM-yyyy")
)

# Add Type column
bonds = bonds.withColumn("Type", lit("bond"))

In [None]:
# Combine stocks and bonds data
financial_data = stocks.union(bonds)

# Save to Parquet
financial_data.write.parquet("work/output/gpw_processed.parquet", mode="overwrite")
print("Bronze layer data saved successfully to work/output/gpw_processed.parquet")

# Silver Layer: 
## Data Cleaning and Feature Engineering
The Silver layer builds on the Bronze layer by cleaning the raw data and
engineering features for analysis. Key steps include:

- Cleaning: Handling null values and filtering invalid records.
- Feature Engineering: Adding metrics like daily returns, volatility, moving averages, and liquidity scores.
- Output: Saving the enriched dataset for use in the Gold layer.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Load Bronze data
silver_df = spark.read.parquet("work/output/gpw_processed.parquet")

# Handle null values
silver_df = silver_df.fillna({"Currency": "PLN", "OpenPositionsCount": 0, "Change": 0})

# Filter invalid records
silver_df = silver_df.filter(
    (col("Open").isNotNull()) & (col("Close") > 0) & (col("Volume") > 0)
)

In [None]:
# Define window specification
window_spec = Window.partitionBy("ISIN").orderBy("DatePL")

# Add features
silver_df = (
    silver_df.withColumn("PrevClose", lag("Close", 1).over(window_spec))
    .withColumn(
        "DailyReturn",
        when(
            col("PrevClose") > 0, (col("Close") - col("PrevClose")) / col("PrevClose")
        ),
    )
    .withColumn("IntradayVolatility", (col("High") - col("Low")) / col("Open"))
    .withColumn("MA5", avg("Close").over(window_spec.rowsBetween(-4, 0)))
    .withColumn("DayOfWeek", date_format(col("DatePL"), "E"))
    .withColumn("DollarVolume", col("Close") * col("Volume"))
)

In [None]:
# Calculate market averages
market_avg = silver_df.agg(
    avg("Turnover").alias("avg_turnover"), avg("DollarVolume").alias("avg_dollar_vol")
).first()

# Add LiquidityScore
silver_df = silver_df.withColumn(
    "LiquidityScore",
    log10(col("Turnover") / market_avg["avg_turnover"] + 1)
    * log10(col("DollarVolume") / market_avg["avg_dollar_vol"] + 1),
)

# Add SizeBucket
silver_df = silver_df.withColumn(
    "SizeBucket",
    when(col("DollarVolume") > 1e6, "LargeCap")
    .when(col("DollarVolume") > 1e5, "MidCap")
    .otherwise("SmallCap"),
)

In [None]:
# Save Silver data
silver_df.write.parquet("work/output/gpw_silver.parquet", mode="overwrite")
print("Silver layer data saved successfully to work/output/gpw_silver.parquet")

# Gold Layer:
## Advanced Analysis and Insights
The Gold layer leverages the cleaned and enriched data from the Silver layer to
generate actionable insights. This includes:

- Top Performers: Identifying stocks with the highest volatility and trading volume.
- Market Relationships: Calculating the correlation between DS bonds and stocks.
- Summary: Providing a concise analysis of key findings.

In [None]:
# Load Silver data
gold_df = spark.read.parquet("work/output/gpw_silver.parquet")

# Top 10 most volatile stocks
top_volatility = (
    gold_df.filter(col("Type") == "stock")
    .groupBy("ISIN", "Name")
    .agg(stddev("DailyReturn").alias("Volatility"), count("*").alias("TradingDays"))
    .filter(col("TradingDays") >= 10)
    .orderBy(desc("Volatility"))
    .limit(10)
)
print("Top 10 Most Volatile Stocks:")
top_volatility.show(truncate=False)

In [None]:
# Top 10 stocks by average volume
top_volume = (
    gold_df.filter(col("Type") == "stock")
    .groupBy("ISIN", "Name")
    .agg(avg("Volume").alias("AvgVolume"), avg("Turnover").alias("AvgTurnover"))
    .orderBy(desc("AvgVolume"))
    .limit(10)
)
print("Top 10 Stocks by Average Volume:")
top_volume.show(truncate=False)

In [None]:
# Correlation between DS bonds and stocks
bond_stock_corr = (
    gold_df.groupBy("DatePL")
    .agg(
        avg(
            when((col("Type") == "bond") & (col("Name").startswith("DS")), col("Close"))
        ).alias("Avg_Bond_Close"),
        avg(when(col("Type") == "stock", col("Close"))).alias("Avg_Stock_Close"),
    )
    .na.drop()
)
corr_value = bond_stock_corr.select(
    corr("Avg_Bond_Close", "Avg_Stock_Close").alias("Correlation")
).collect()[0][0]
print(f"Correlation between DS bonds and stocks: {corr_value:.4f}")

In [None]:
# Save results
top_volatility.write.parquet(
    "work/output/gold/top_volatility.parquet", mode="overwrite"
)
top_volume.write.parquet("work/output/gold/top_volume.parquet", mode="overwrite")
bond_stock_corr.write.parquet(
    "work/output/gold/bond_stock_corr.parquet", mode="overwrite"
)
print("Gold layer analysis saved successfully!")

# Final Analysis
## The Gold layer analysis reveals key insights into the GPW market:

- Volatility: Stocks like INTAKUS and SKYSTONE exhibit high return volatility, indicating potential risk or opportunity.
- Volume: High-volume stocks (e.g., BIOTON, PGNIG) suggest strong market interest and liquidity.
- Bond-Stock Correlation: A negative correlation (-0.1191) between DS bonds and stocks indicates a potential diversification benefit.

These findings can guide investment decisions or further research into specific instruments.

In [None]:
import matplotlib.pyplot as plt
# Convert to Pandas for visualization
volatility_pd = top_volatility.toPandas()

# Plot
plt.figure(figsize=(12, 6))
bars = plt.bar(volatility_pd['Name'], volatility_pd['Volatility'], color='#FF6B6B')
plt.title('Top 10 Volatile Stocks (GPW)', fontsize=14)
plt.xlabel('Stock', fontsize=12)
plt.ylabel('Volatility (STD of Daily Returns)', fontsize=12)
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Add value labels
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height*1.01,
             f'{height:.2f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

In [None]:
volume_pd = top_volume.toPandas()

plt.figure(figsize=(12, 6))
bars = plt.bar(volume_pd['Name'], volume_pd['AvgVolume']/1e6, color='#4ECDC4')
plt.title('Top 10 Stocks by Average Volume (GPW)', fontsize=14)
plt.xlabel('Stock', fontsize=12)
plt.ylabel('Average Volume (Millions)', fontsize=12)
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Add value labels
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height*1.01,
             f'{height:.1f}M', ha='center', va='bottom')

plt.tight_layout()
plt.show()

In [None]:
# Prepare data
corr_pd = bond_stock_corr.orderBy('DatePL').toPandas()

plt.figure(figsize=(14, 7))
plt.plot(corr_pd['DatePL'], corr_pd['Avg_Stock_Close'], 
         label='Stocks', linewidth=2, color='#5563DE')
plt.plot(corr_pd['DatePL'], corr_pd['Avg_Bond_Close'], 
         label='DS Bonds', linewidth=2, color='#FFA94D')

plt.title('Market Trend: Stocks vs DS Bonds', fontsize=16)
plt.xlabel('Date', fontsize=12)
plt.ylabel('Normalized Closing Price', fontsize=12)
plt.grid(linestyle='--', alpha=0.7)
plt.legend()

# Annotate correlation value
plt.figtext(0.15, 0.8, f'Correlation: {corr_value:.4f}', 
            fontsize=12, bbox={'facecolor':'white', 'alpha':0.8})

plt.tight_layout()
plt.show()