In [1]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SaudiStocks") \
    .master("local[*]") \
    .getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/12/03 03:01:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
from pyspark.sql import functions as F

path = "/Users/foofaa/Desktop/saudi-stocks-spark/data/raw/2222.SR.csv"

df = spark.read.csv(
    path,
    header=True,
    inferSchema=True
)

df.printSchema()
df.show(5)

root
 |-- Date: timestamp (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Symbol: string (nullable = true)

+-------------------+------------------+------------------+------------------+------------------+------------------+---------+-------+
|               Date|         Adj Close|             Close|              High|               Low|              Open|   Volume| Symbol|
+-------------------+------------------+------------------+------------------+------------------+------------------+---------+-------+
|2019-12-11 00:00:00| 22.35260581970215| 29.09090805053711| 29.09090805053711| 29.09090805053711| 29.09090805053711| 38289394|2222.SR|
|2019-12-12 00:00:00|23.368629455566406|30.413223266601562|31.983470916748047|29.752065658569336|31.983470916748047|505692621|2222.SR|
|2019-12-15 00:00:

In [13]:
#adding year and month for each row
from pyspark.sql import functions as F

df_ym = df.withColumn("Year",  F.year("Date")) \
          .withColumn("Month", F.month("Date"))

df_ym.select("Date", "Close", "Year", "Month").show(5)

+-------------------+------------------+----+-----+
|               Date|             Close|Year|Month|
+-------------------+------------------+----+-----+
|2019-12-11 00:00:00| 29.09090805053711|2019|   12|
|2019-12-12 00:00:00|30.413223266601562|2019|   12|
|2019-12-15 00:00:00|30.909090042114258|2019|   12|
|2019-12-16 00:00:00|31.404958724975586|2019|   12|
|2019-12-17 00:00:00|31.198347091674805|2019|   12|
+-------------------+------------------+----+-----+
only showing top 5 rows



In [15]:
#monthly stats

monthly_stats = (
    df_ym.groupBy("Year", "Month")
         .agg(
             F.min("Close").alias("min_close"),
             F.max("Close").alias("max_close"),
             F.avg("Close").alias("avg_close")
         )
         .orderBy("Year", "Month")
)

monthly_stats.show(50, truncate=False)

+----+-----+------------------+------------------+------------------+
|Year|Month|min_close         |max_close         |avg_close         |
+----+-----+------------------+------------------+------------------+
|2019|12   |29.049585342407227|31.404958724975586|29.75206578572591 |
|2020|1    |28.223140716552734|29.049585342407227|28.56123161315918 |
|2020|2    |27.355371475219727|28.14049530029297 |27.70247926712036 |
|2020|3    |22.97520637512207 |27.314048767089844|24.9083716351053  |
|2020|4    |24.29751968383789 |26.48760223388672 |25.565363710576836|
|2020|5    |24.79338836669922 |27.561983108520508|26.177685379981995|
|2020|6    |26.611570358276367|27.355371475219727|26.925243637778543|
|2020|7    |27.02479362487793 |27.644628524780273|27.355371284484864|
|2020|8    |27.231403350830078|29.380165100097656|28.073074842754163|
|2020|9    |29.132230758666992|30.537189483642578|29.903580711001442|
|2020|10   |27.851238250732422|29.752065658569336|29.21487553914388 |
|2020|11   |27.76859

In [17]:
#Cheapest and Most expensive avg close month in last 7 years

cheapest_month = monthly_stats.orderBy("avg_close").limit(1)
most_expensive_month = monthly_stats.orderBy(F.desc("avg_close")).limit(1)

print("Cheapest month (by avg close):")
cheapest_month.show()

print("Most expensive month (by avg close):")
most_expensive_month.show()

Cheapest month (by avg close):
+----+-----+----------------+------------------+------------------+
|Year|Month|       min_close|         max_close|         avg_close|
+----+-----+----------------+------------------+------------------+
|2025|    9|23.1299991607666|25.059999465942383|23.965714227585565|
+----+-----+----------------+------------------+------------------+

Most expensive month (by avg close):
+----+-----+----------------+----------------+-----------------+
|Year|Month|       min_close|       max_close|        avg_close|
+----+-----+----------------+----------------+-----------------+
|2022|    5|35.6363639831543|38.6363639831543|37.32208379109701|
+----+-----+----------------+----------------+-----------------+



In [21]:
#Highest volume trading months

monthly_volume = (
    df_ym.groupBy("Year", "Month")
         .agg(F.sum("Volume").alias("total_volume"))
         .orderBy(F.desc("total_volume"))
)

monthly_volume.show(10, truncate=False)  

+----+-----+------------+
|Year|Month|total_volume|
+----+-----+------------+
|2019|12   |1259795227  |
|2024|6    |1143572939  |
|2024|11   |516454704   |
|2022|3    |501497910   |
|2024|8    |408377236   |
|2023|8    |397802543   |
|2024|1    |396155566   |
|2020|3    |379113435   |
|2024|12   |359802113   |
|2024|7    |331523575   |
+----+-----+------------+
only showing top 10 rows



In [23]:
#last 7 years avg

yearly_avg = (
    df_ym.groupBy("Year")
         .agg(F.avg("Close").alias("year_avg_close"))
         .orderBy("Year")
)

yearly_avg.show()

+----+------------------+
|Year|    year_avg_close|
+----+------------------+
|2019| 29.75206578572591|
|2020|27.741924361878656|
|2021|29.296197898864747|
|2022|33.327811841041814|
|2023|31.999269508453736|
|2024|29.125000076293944|
|2025|25.505217336571736|
+----+------------------+



In [25]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

df_year = df_ym.select("Year", "Date", "Close")
w_year_asc  = Window.partitionBy("Year").orderBy("Date")
w_year_desc = Window.partitionBy("Year").orderBy(F.col("Date").desc())

df_ranked = (
    df_year
    .withColumn("rn_asc",  F.row_number().over(w_year_asc))   # أول يوم في السنة
    .withColumn("rn_desc", F.row_number().over(w_year_desc))  # آخر يوم في السنة
)

# أول إقفال في كل سنة
first = (
    df_ranked
    .filter(F.col("rn_asc") == 1)
    .select("Year", F.col("Close").alias("first_close"))
)

# آخر إقفال في كل سنة
last = (
    df_ranked
    .filter(F.col("rn_desc") == 1)
    .select("Year", F.col("Close").alias("last_close"))
)

# ونحسب العائد السنوي
year_returns = (
    first.join(last, "Year")
    .withColumn(
        "year_return_pct",
        F.round((F.col("last_close") / F.col("first_close") - 1) * 100, 2)
    )
    .orderBy("Year")
)

year_returns.show(truncate=False)

#It gives you the return of an investor who bought at the beginning of the year and sold at the end.

+----+------------------+------------------+---------------+
|Year|first_close       |last_close        |year_return_pct|
+----+------------------+------------------+---------------+
|2019|29.09090805053711 |29.132230758666992|0.14           |
|2020|29.008264541625977|28.92561912536621 |-0.28          |
|2021|28.801652908325195|29.586776733398438|2.73           |
|2022|29.380165100097656|29.18181800842285 |-0.68          |
|2023|29.454544067382812|33.0              |12.04          |
|2024|33.04999923706055 |28.049999237060547|-15.13         |
|2025|28.049999237060547|24.399999618530273|-13.01         |
+----+------------------+------------------+---------------+



In [29]:
#Best and worst years for investment

best_year = year_returns.orderBy(F.col("year_return_pct").desc()).limit(1)
worst_year = year_returns.orderBy("year_return_pct").limit(1)

print(" أفضل سنة للاستثمار (حسب عائد أول السنة إلى آخرها):")
best_year.show(truncate=False)

print(" أسوأ سنة للاستثمار:")
worst_year.show(truncate=False)

 أفضل سنة للاستثمار (حسب عائد أول السنة إلى آخرها):
+----+------------------+----------+---------------+
|Year|first_close       |last_close|year_return_pct|
+----+------------------+----------+---------------+
|2023|29.454544067382812|33.0      |12.04          |
+----+------------------+----------+---------------+

 أسوأ سنة للاستثمار:
+----+-----------------+------------------+---------------+
|Year|first_close      |last_close        |year_return_pct|
+----+-----------------+------------------+---------------+
|2024|33.04999923706055|28.049999237060547|-15.13         |
+----+-----------------+------------------+---------------+



In [31]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
 
w_year = (
    Window
    .partitionBy("Year")
    .orderBy("Date")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

df_dd = (
    df_ym
    .withColumn("running_max_close", F.max("Close").over(w_year))    #The highest closing price reached from the beginning of the year until today.
    .withColumn(
        "drawdown_pct",
        (F.col("Close") - F.col("running_max_close")) / F.col("running_max_close") * 100
    )
)    #If the price is at the top = 0   if else -

df_dd.select("Date", "Year", "Close", "running_max_close", "drawdown_pct").show(10, truncate=False)

+-------------------+----+------------------+------------------+-------------------+
|Date               |Year|Close             |running_max_close |drawdown_pct       |
+-------------------+----+------------------+------------------+-------------------+
|2019-12-11 00:00:00|2019|29.09090805053711 |29.09090805053711 |0.0                |
|2019-12-12 00:00:00|2019|30.413223266601562|30.413223266601562|0.0                |
|2019-12-15 00:00:00|2019|30.909090042114258|30.909090042114258|0.0                |
|2019-12-16 00:00:00|2019|31.404958724975586|31.404958724975586|0.0                |
|2019-12-17 00:00:00|2019|31.198347091674805|31.404958724975586|-0.6578949366249863|
|2019-12-18 00:00:00|2019|30.330577850341797|31.404958724975586|-3.421054885129846 |
|2019-12-19 00:00:00|2019|29.338842391967773|31.404958724975586|-6.578949366249863 |
|2019-12-22 00:00:00|2019|29.173553466796875|31.404958724975586|-7.105264100869936 |
|2019-12-23 00:00:00|2019|29.42148780822754 |31.404958724975586|-

In [35]:
#lowest value of drawdown_pct

year_max_dd = (
    df_dd
    .groupBy("Year")
    .agg(
        (-F.min("drawdown_pct")).alias("max_drawdown_pct_pos")  
    )
    .orderBy("Year")
)

year_max_dd.show(truncate=False)

+----+--------------------+
|Year|max_drawdown_pct_pos|
+----+--------------------+
|2019|7.500004706884678   |
|2020|20.910380976824626  |
|2021|8.575200708955162   |
|2022|28.823530050627493  |
|2023|6.777113068690433   |
|2024|19.8804226433582    |
|2025|18.123896776047427  |
+----+--------------------+



In [41]:
#Stock Volatility within each year

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w_daily = (
    Window
    .partitionBy("Year")
    .orderBy("Date")
)

df_ret = (
    df_ym
    .withColumn("prev_close", F.lag("Close").over(w_daily))
    .withColumn(
        "daily_return_pct",
        (F.col("Close") / F.col("prev_close") - 1) * 100
    )
)

df_ret_filtered = df_ret.filter(F.col("prev_close").isNotNull())

year_vol = (
    df_ret_filtered
    .groupBy("Year")
    .agg(
        F.round(F.avg("daily_return_pct"), 3).alias("avg_daily_return_pct"),
        F.round(F.stddev("daily_return_pct"), 3).alias("volatility_pct")
    )
    .orderBy("Year")
)

year_vol.show(truncate=False)

+----+--------------------+--------------+
|Year|avg_daily_return_pct|volatility_pct|
+----+--------------------+--------------+
|2019|0.027               |1.905         |
|2020|0.009               |1.405         |
|2021|0.013               |0.618         |
|2022|0.007               |1.432         |
|2023|0.05                |0.901         |
|2024|-0.063              |0.754         |
|2025|-0.056              |0.966         |
+----+--------------------+--------------+



In [43]:
#The biggest rally and the biggest crash in last 7 years

best_day = (
    df_ret_filtered
    .orderBy(F.desc("daily_return_pct"))
    .select("Date", "Close", "daily_return_pct")
    .limit(1)
)

worst_day = (
    df_ret_filtered
    .orderBy("daily_return_pct")
    .select("Date", "Close", "daily_return_pct")
    .limit(1)
)

print("أقوى يوم صعود:")
best_day.show(truncate=False)

print("أسوأ يوم هبوط:")
worst_day.show(truncate=False)

أقوى يوم صعود:
+-------------------+-----------------+-----------------+
|Date               |Close            |daily_return_pct |
+-------------------+-----------------+-----------------+
|2020-03-10 00:00:00|25.74380111694336|9.876539591782386|
+-------------------+-----------------+-----------------+

أسوأ يوم هبوط:
+-------------------+-----------------+------------------+
|Date               |Close            |daily_return_pct  |
+-------------------+-----------------+------------------+
|2020-03-08 00:00:00|24.79338836669922|-9.090905276211657|
+-------------------+-----------------+------------------+



In [53]:
from pyspark.sql import functions as F

# أعلى إغلاق في كل التاريخ
highest_close = (
    df
    .orderBy(F.desc("Close"))
    .select("Date", "Close")
    .limit(1)
)

# أقل إغلاق في كل التاريخ
lowest_close = (
    df
    .orderBy("Close")
    .select("Date", "Close")
    .limit(1)
)

print("أعلى سعر إغلاق في السبع سنوات:")
highest_close.show(truncate=False)

print("أقل سعر إغلاق في السبع سنوات:")
lowest_close.show(truncate=False)

أعلى سعر إغلاق في السبع سنوات:
+-------------------+----------------+
|Date               |Close           |
+-------------------+----------------+
|2022-05-16 00:00:00|38.6363639831543|
+-------------------+----------------+

أقل سعر إغلاق في السبع سنوات:
+-------------------+-----------------+
|Date               |Close            |
+-------------------+-----------------+
|2020-03-16 00:00:00|22.97520637512207|
+-------------------+-----------------+



In [55]:
#Overall return + CAGR

from pyspark.sql import functions as F

# Get first and last date and price
first_last = (
    df_ret_filtered
    .orderBy("Date")
    .agg(
        F.first("Date").alias("start_date"),
        F.last("Date").alias("end_date"),
        F.first("Close").alias("start_close"),
        F.last("Close").alias("end_close")
    )
)

first_last.show(truncate=False)

# Calculate total return, number of years, and CAGR
overall_metrics = (
    first_last
    .withColumn(
        "total_return_pct",                # Total return for the whole period (%)
        (F.col("end_close") / F.col("start_close") - 1) * 100
    )
    .withColumn(
        "num_years",                       # Length of period in years
        F.datediff("end_date", "start_date") / F.lit(365.25)
    )
    .withColumn(
        "cagr_pct",                        # Annual compound return (%)
        (F.pow(
            F.col("end_close") / F.col("start_close"),
            F.lit(1.0) / F.col("num_years")
        ) - 1) * 100
    )
)

print("Overall performance:")
overall_metrics.show(truncate=False)

+-------------------+-------------------+------------------+------------------+
|start_date         |end_date           |start_close       |end_close         |
+-------------------+-------------------+------------------+------------------+
|2019-12-12 00:00:00|2025-12-02 00:00:00|30.413223266601562|24.399999618530273|
+-------------------+-------------------+------------------+------------------+

Overall performance:
+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|start_date         |end_date           |start_close       |end_close         |total_return_pct  |num_years        |cagr_pct          |
+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|2019-12-12 00:00:00|2025-12-02 00:00:00|30.413223266601562|24.399999618530273|-19.77174071738309|5.973990417522245|-3.620395846006985|
+-------------------+-------------

In [61]:
overall_metrics_rounded = (
    overall_metrics
    .select(
        "start_date",
        "end_date",
        F.round("start_close", 2).alias("start_close"),
        F.round("end_close", 2).alias("end_close"),
        F.round("total_return_pct", 2).alias("total_return_pct"),
        F.round("num_years", 2).alias("num_years"),
        F.round("cagr_pct", 2).alias("cagr_pct")
    )
)

overall_metrics_rounded.show(truncate=False)

+-------------------+-------------------+-----------+---------+----------------+---------+--------+
|start_date         |end_date           |start_close|end_close|total_return_pct|num_years|cagr_pct|
+-------------------+-------------------+-----------+---------+----------------+---------+--------+
|2019-12-12 00:00:00|2025-12-02 00:00:00|30.41      |24.4     |-19.77          |5.97     |-3.62   |
+-------------------+-------------------+-----------+---------+----------------+---------+--------+



In [67]:
from pyspark.sql import functions as F

# Base folder for Power BI exports (relative to your home folder)
base_path = "Desktop/saudi-stocks-spark/output_powerbi"

# 1) Daily data with daily return
(
    df_ret_filtered
    .select("Date", "Close", "Volume", "daily_return_pct")
    .coalesce(1)                       # single CSV file
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{base_path}/daily_data")
)

# 2) Monthly stats
(
    monthly_stats
    .coalesce(1)
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{base_path}/monthly_stats")
)

# 3) Monthly volume
(
    monthly_volume
    .coalesce(1)
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{base_path}/monthly_volume")
)

# 4) Yearly average close
(
    yearly_avg
    .coalesce(1)
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{base_path}/yearly_avg")
)

# 5) Yearly returns
(
    year_returns
    .coalesce(1)
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{base_path}/yearly_returns")
)

# 6) Max drawdown per year
(
    year_max_dd
    .coalesce(1)
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{base_path}/year_max_drawdown")
)

# 7) Best day
best_day.coalesce(1).write.mode("overwrite").option("header", True)\
    .csv(f"{base_path}/best_day")

# 8) Worst day
worst_day.coalesce(1).write.mode("overwrite").option("header", True)\
    .csv(f"{base_path}/worst_day")

# 9) Overall performance metrics
(
    overall_metrics_rounded
    .coalesce(1)
    .write.mode("overwrite")
    .option("header", True)
    .csv(f"{base_path}/overall_metrics")
)

25/12/03 05:10:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/03 05:10:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/03 05:10:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/03 05:10:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/03 05:10:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/03 05:10:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/03 0

In [69]:
tables = {
    "prices": df,
    "daily_returns": df_ret_filtered,
    "monthly_stats": monthly_stats,
    "monthly_volume": monthly_volume,
    "yearly_avg": yearly_avg,
    "year_returns": year_returns,
    "max_drawdown": year_max_dd,
    "overall_metrics": overall_metrics
}

for name, t in tables.items():
    print(f"{name} rows:", t.count())

prices rows: 1493
daily_returns rows: 1486
monthly_stats rows: 73
monthly_volume rows: 73
yearly_avg rows: 7
year_returns rows: 7
max_drawdown rows: 7
overall_metrics rows: 1
