In [0]:
%python
from pyspark.sql.functions import col, to_date, round, lit, current_date, date_sub, when

# Wczytanie danych i połączenie
silver_df = (spark.read.format("delta").table("plstocks.bronze_historic_prices")
             .drop("<PER>", "<OPENINT>")
             .join(spark.read.format("delta").table("plstocks.bronze_company_name_lookup"),
                   col("<TICKER>") == col("ticker"), "left")
             .drop("ticker"))

# Przekształcenie kolumny <DATE> i zmiana nazw kolumn
silver_df = (silver_df.withColumn("DATE", to_date(col("<DATE>").cast("string"), "yyyyMMdd"))
             .withColumnRenamed("<TICKER>", "TICKER")
             .withColumnRenamed("stock_name", "STOCK_NAME")
             .withColumnRenamed("<TIME>", "TIME")
             .withColumnRenamed("<OPEN>", "OPEN")
             .withColumnRenamed("<HIGH>", "HIGH")
             .withColumnRenamed("<LOW>", "LOW")
             .withColumnRenamed("<CLOSE>", "CLOSE")
             .withColumnRenamed("<VOL>", "VOL"))

# Formatowanie wartości kolumn
for col_name in ["OPEN", "HIGH", "LOW", "CLOSE"]:
    silver_df = silver_df.withColumn(col_name, round(col(col_name), 2).cast("double"))

# Dodanie kolumny ACTIVE
silver_df = silver_df.withColumn("ACTIVE", when(col("STOCK_NAME").isNotNull(), True).otherwise(False))

# Wybór kolumn w odpowiedniej kolejności
silver_df = silver_df.select("TICKER", "STOCK_NAME", "DATE", "ACTIVE", "OPEN", "HIGH", "LOW", "CLOSE", "VOL")

# Zapisanie DataFrame jako tabela Delta z nadpisaniem schematu
#silver_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("plstocks.silver_stocks_price")

display(silver_df)

In [0]:
%python
from pyspark.sql.functions import col, to_date, max, min, first, last, count, date_format
from pyspark.sql.functions import when

# Wczytanie danych z tabeli plstocks.bronze_prices_raw_stream i join z tabelą lookup
raw_stream_df = (spark.read.format("delta").table("plstocks.bronze_prices_raw_stream")
                .join(spark.read.format("delta").table("plstocks.bronze_company_name_lookup"),
                      col("plstocks.bronze_prices_raw_stream.ticker") == col("plstocks.bronze_company_name_lookup.ticker"), "left"))

# Przekształcenie kolumny time na datę
raw_stream_df = raw_stream_df.withColumn("DATE", to_date(date_format(col("time"), "yyyy-MM-dd")))

# Agregacja danych - VOL to liczba rekordów (liczona przez count)
aggregated_df = raw_stream_df.groupBy("plstocks.bronze_prices_raw_stream.ticker", "stock_name", "DATE").agg(
    max(col("price")).alias("HIGH"),
    min(col("price")).alias("LOW"),
    last(col("price")).alias("CLOSE"),
    first(col("price")).alias("OPEN"),
    count(col("price")).alias("VOL") 
)

# Dodanie kolumny ACTIVE
silver2_df = aggregated_df.withColumn("ACTIVE", when(col("stock_name").isNotNull(), True).otherwise(False))

# Wybór kolumn w odpowiedniej kolejności
silver2_df = silver2_df.select(
    col("plstocks.bronze_prices_raw_stream.ticker").alias("TICKER"), 
    "stock_name", 
    "DATE", 
    "ACTIVE", 
    "OPEN", 
    "HIGH", 
    "LOW", 
    "CLOSE", 
    "VOL"
)

# Wyświetlenie wyniku
display(silver2_df)

In [0]:
%python
# Połączenie silver_df z silver2_df
combined_df = silver_df.unionByName(silver2_df)

# Wyświetlenie połączonego DataFrame
display(combined_df)

In [0]:
combined_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("plstocks.silver_stocks_price")