In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName("Process Bronze to Silver")\
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000")\
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

In [2]:
yfinance_bronze_path = "/user/adam_majczyk2001/nifi/bronze_parquet/yfinance/"
news_bronze_path = "/user/adam_majczyk2001/nifi/bronze_parquet/news/"

yfinance_silver_path = "/user/adam_majczyk2001/nifi/silver_parquet/yfinance/"
news_silver_path = "/user/adam_majczyk2001/nifi/silver_parquet/news/"

In [4]:
yfinance_df = spark.read.parquet(yfinance_silver_path)
news_df = spark.read.parquet(news_silver_path)

                                                                                

In [4]:
yfinance_df.head()

Row(timestamp='2025-01-02T20:52:59.980215', updates_XOM=[Row(price=107.53, volume=7408883, volatility=0.197, bid_ask_spread=0.0314, market_sentiment=0.542, trading_activity=59.47, timestamp='2025-01-02T20:48:00.271262', source='real'), Row(price=107.53, volume=7408883, volatility=0.197, bid_ask_spread=0.0314, market_sentiment=0.542, trading_activity=59.47, timestamp='2025-01-02T20:48:01.474490', source='real'), Row(price=107.53, volume=7408883, volatility=0.197, bid_ask_spread=0.0314, market_sentiment=0.542, trading_activity=59.47, timestamp='2025-01-02T20:48:02.085317', source='real'), Row(price=107.36, volume=7419864, volatility=0.198, bid_ask_spread=0.0315, market_sentiment=0.523, trading_activity=62.99, timestamp='2025-01-02T20:48:02.144747', source='simulated'), Row(price=107.53, volume=7419864, volatility=0.198, bid_ask_spread=0.0315, market_sentiment=0.523, trading_activity=62.99, timestamp='2025-01-02T20:48:04.057117', source='real'), Row(price=107.53, volume=7419864, volatilit

In [5]:
news_df.head()

                                                                                

Row(title='Ostrzegają przed unijną polityką. "Zagrozi górniczym płacom i przyspieszy likwidację kopalń"', date='2024-11-28 08:41:00', keywords=['GÓRNICY', 'KOPALNIE', 'RYNEK PRACY', 'UNIA EUROPEJSKA', 'WYDOBYCIE WĘGLA', 'RYNEK WĘGLA', 'PRODUKCJA ENERGII', 'POLITYKA KLIMATYCZNA', 'HUTEK BOGUSŁAW', 'GÓRNICTWO', 'ENERGETYKA WĘGLOWA', 'EUROPEJSKI ZIELONY ŁAD'], is_premium=False, source_site='wnp.pl', url='https://www.wnp.pl/energetyka/ostrzegaja-przed-unijna-polityka-zagrozi-gorniczym-placom-i-przyspieszy-likwidacje-kopaln,892344.html', text='Do kopalń trafiła Błyskawica - publikacja Krajowej Sekcji Górnictwa Węgla Kamiennego NSZZ Solidarność poświęcona zagrożeniom, jakie wynikają z Europejskiego Zielonego Ładu. - Wprowadzenie Zielonego Ładu zagrozi istnieniu tysięcy miejsc pracy w sektorze wydobywczym - ocenia Bogusław Hutek, szef górniczej Solidarności. - Nowe przepisy, które wkrótce zaczną obowiązywać, takie jak "Krajowy Plan w dziedzinie Energii i Klimatu do 2030 roku", tworzy się w op

In [6]:
# Leave news with unique title
news_df_no_duplicates = news_df.dropDuplicates(["title"])

In [7]:
news_df_no_duplicates.head()

                                                                                

Row(title='3 tys. uwag do nowej konstytucji klimatycznej. Biznes wie, czego chce', date='2024-11-26 13:50:00', keywords=['POLITYKA ENERGETYCZNA', 'PSE', 'TGPE', 'TOKARSKI STANISŁAW', 'ELEKTROWNIE WĘGLOWE', 'ONICHIMOWSKI GRZEGORZ', 'SZULC WALDEMAR', 'TOWARZYSTWO GOSPODARCZE POLSKIE ELEKTROWNIE', 'RYNEK MOCY', 'KIG', 'KPEIK', 'MINISTERSTWO KLIMATU I ŚRODOWISKA', 'MOCE DYSPOZYCYJNE'], is_premium=False, source_site='wnp.pl', url='https://www.wnp.pl/gazownictwo/3-tys-uwag-do-nowej-konstytucji-klimatycznej-biznes-wie-czego-chce,891670.html', text='Do projektu Krajowego Planu w dziedzinie Energii i Klimatu do 2030 r. zgłoszono prawie 3 tys. uwag. Krajowa Izba Gospodarcza podkreśla potrzebę zapewnienia mocy sterowalnych w polskim systemie elektroenergetycznymi. Zdecydowanie wybrzmiewa  postulat kontynuacji rynku mocy. Dyskusja na temat projektu aktualizacji Krajowego Planu w dziedzinie Energii i Klimatu do 2030 r. (KPEiK) była głównym punktem XIII Meetingu Gospodarczego KIG, który odbył się 25

In [8]:
from pyspark.sql.functions import col, to_timestamp, date_format

# Convert "date" column to timestamp format and rename to "datetime"
news_df_no_duplicates = news_df_no_duplicates.withColumn("date", to_timestamp(col("date"), "yyyy-MM-dd"))
news_df_no_duplicates = news_df_no_duplicates.withColumnRenamed("date", "datetime")

# Format the datetime column to include microseconds in ISO 8601 format
news_df_no_duplicates = news_df_no_duplicates.withColumn("datetime", date_format(col("datetime"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))

In [10]:
# Example: Disable vectorized reader
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
# Disable case sensitivity
spark.conf.set("spark.sql.caseSensitive", "false")

from pyspark.sql.functions import col, explode, lit

# Explode updates for each company and rename fields
xom_updates = yfinance_df\
    .withColumn("update", explode(col("updates_XOM"))) \
    .select(
        col("timestamp").alias("record_timestamp"),
        col("update.price").alias("price"),
        col("update.volume").alias("volume"),
        col("update.volatility").alias("volatility"),
        col("update.bid_ask_spread").alias("bid_ask_spread"),
        col("update.market_sentiment").alias("market_sentiment"),
        col("update.trading_activity").alias("trading_activity"),
        col("update.timestamp").alias("update_timestamp"),
        col("update.source").alias("source"),
        lit("XOM").alias("company")
    )

bp_updates = yfinance_df\
    .withColumn("update", explode(col("updates_BP"))) \
    .select(
        col("timestamp").alias("record_timestamp"),
        col("update.price").alias("price"),
        col("update.volume").alias("volume"),
        col("update.volatility").alias("volatility"),
        col("update.bid_ask_spread").alias("bid_ask_spread"),
        col("update.market_sentiment").alias("market_sentiment"),
        col("update.trading_activity").alias("trading_activity"),
        col("update.timestamp").alias("update_timestamp"),
        col("update.source").alias("source"),
        lit("BP").alias("company")
    )

shell_updates = yfinance_df \
    .withColumn("update", explode(col("updates_SHEL"))) \
    .select(
        col("timestamp").alias("record_timestamp"),
        col("update.price").alias("price"),
        col("update.volume").alias("volume"),
        col("update.volatility").alias("volatility"),
        col("update.bid_ask_spread").alias("bid_ask_spread"),
        col("update.market_sentiment").alias("market_sentiment"),
        col("update.trading_activity").alias("trading_activity"),
        col("update.timestamp").alias("update_timestamp"),
        col("update.source").alias("source"),
        lit("SHEL").alias("company")
    )

cop_updates = yfinance_df \
    .withColumn("update", explode(col("updates_COP"))) \
    .select(
        col("timestamp").alias("record_timestamp"),
        col("update.price").alias("price"),
        col("update.volume").alias("volume"),
        col("update.volatility").alias("volatility"),
        col("update.bid_ask_spread").alias("bid_ask_spread"),
        col("update.market_sentiment").alias("market_sentiment"),
        col("update.trading_activity").alias("trading_activity"),
        col("update.timestamp").alias("update_timestamp"),
        col("update.source").alias("source"),
    )
# drop row with null values (any column)
cop_updates = cop_updates.dropna(how="any")
cop_updates = cop_updates.withColumn("company", lit("COP"))

# Union all updates
updates = xom_updates.union(bp_updates).union(shell_updates)
updates = updates.dropDuplicates(["record_timestamp", "update_timestamp", "company"])

In [11]:
# convert to pandas
import pandas as pd

In [12]:
cop_updates_pd = cop_updates.toPandas()

                                                                                