In [2]:
# Init spark session to read data from parquet files
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, IntegerType
from pyspark.sql import functions as F


In [3]:
# Importing the data
spark = SparkSession.builder.appName("data_processing").getOrCreate()
df_stock = spark.read.parquet("fetch_data/raw_data/parquets/stock_prices.parquet")
df_last_updated = spark.read.parquet("fetch_data/raw_data/parquets/last_update.parquet")


In [4]:
# show the data
df_stock.show(5)
df_last_updated.show(5)

+----------+---------+---------+---------+---------+------+------+
|trade_date|     open|     high|      low|    close|volume|symbol|
+----------+---------+---------+---------+---------+------+------+
|1973-01-17|13.634607|13.634607|13.634607|13.634607|     0|   AIG|
|1973-02-08|12.481533|12.481533|12.481533|12.481533|     0|   AIG|
|1973-02-21|12.380376|12.380376|12.380376|12.380376|     0|   AIG|
|1973-02-22|12.339923|12.339923|12.339923|12.339923|     0|   AIG|
|1973-04-09|11.652126|11.652126|11.652126|11.652126|     0|   AIG|
+----------+---------+---------+---------+---------+------+------+
only showing top 5 rows

+------+-----------+
|symbol|last_update|
+------+-----------+
|   XOS| 2025-03-04|
|  XTKG| 2025-03-04|
|  YCBD| 2025-03-04|
|    WU| 2025-03-04|
|   XOM| 2025-03-04|
+------+-----------+
only showing top 5 rows



In [5]:
# Check what fields are duplicated
duplicates = df_stock.groupBy("symbol", "trade_date").count().filter("count > 1")
print(f"Records with same symbol and date: {duplicates.count()}")

# Sample some duplicates to examine
if duplicates.count() > 0:
    print("Sample duplicates:")
    duplicates.join(df_stock, ["symbol", "trade_date"]).orderBy("symbol", "trade_date").show(10)

Records with same symbol and date: 6483
Sample duplicates:
+------+----------+-----+---------+---------+---------+---------+------+
|symbol|trade_date|count|     open|     high|      low|    close|volume|
+------+----------+-----+---------+---------+---------+---------+------+
|   NAN|1999-05-26|    2|3.5172732|3.5319285|3.5172732|3.5172732|218400|
|   NAN|1999-05-26|    2|3.5172706| 3.531926|3.5172706|3.5172706|218400|
|   NAN|1999-05-27|    2|3.5172732| 3.546584|3.5172732|3.5172732| 38600|
|   NAN|1999-05-27|    2|3.5172706|3.5465813|3.5172706|3.5172706| 38600|
|   NAN|1999-05-28|    2| 3.517274|3.5465846| 3.517274|3.5465846| 16100|
|   NAN|1999-05-28|    2| 3.517269|3.5465794| 3.517269|3.5465794| 16100|
|   NAN|1999-06-01|    2|3.5319285|3.5319285|3.5172732|3.5172732| 27300|
|   NAN|1999-06-01|    2| 3.531926| 3.531926|3.5172706|3.5172706| 27300|
|   NAN|1999-06-02|    2|3.5319285|3.5319285|3.5172732|3.5172732| 27100|
|   NAN|1999-06-02|    2| 3.531926| 3.531926|3.5172706|3.5172706|

In [15]:
from datetime import date


outdated_symbols = df_last_updated.filter(F.col(df_last_updated.columns[0]) == date.today()).select(df_last_updated.columns[1]).collect()
print(outdated_symbols)

# convert to spark dataframe
outdated_symbols = [row[0] for row in outdated_symbols]
print(outdated_symbols)

df_stock_filtered = df_stock.filter(F.col("symbol").isin(outdated_symbols))
df_stock_filtered.show(5)

[Row(symbol='GLDD'), Row(symbol='EWCZ'), Row(symbol='ALGT'), Row(symbol='GLSI'), Row(symbol='EVBN'), Row(symbol='DMB'), Row(symbol='BFRI'), Row(symbol='CMLS'), Row(symbol='AKRO'), Row(symbol='EIG'), Row(symbol='CXM'), Row(symbol='FNGD'), Row(symbol='GNTX'), Row(symbol='CCJ'), Row(symbol='GASS'), Row(symbol='ALF'), Row(symbol='EXEEW'), Row(symbol='CLIR'), Row(symbol='ADIL'), Row(symbol='CELU'), Row(symbol='EXAS'), Row(symbol='AMG'), Row(symbol='CF'), Row(symbol='FSFG'), Row(symbol='BRO'), Row(symbol='FNLC'), Row(symbol='ACHL'), Row(symbol='CNF'), Row(symbol='COCH'), Row(symbol='ARCO'), Row(symbol='EDTK'), Row(symbol='FF'), Row(symbol='AIZ'), Row(symbol='FCT'), Row(symbol='CRF'), Row(symbol='ERC'), Row(symbol='ATI'), Row(symbol='GES'), Row(symbol='CCIXW'), Row(symbol='FWRD'), Row(symbol='CYTH'), Row(symbol='CCU'), Row(symbol='BKTI'), Row(symbol='AMRC'), Row(symbol='FPAY'), Row(symbol='FAAS'), Row(symbol='CNCK'), Row(symbol='BGS'), Row(symbol='ENOV'), Row(symbol='DBRG'), Row(symbol='BLDEW