In [8]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("NiftyStockAnalysis") \
    .getOrCreate()

spark


In [9]:
from pyspark.sql import functions as F


df = spark.read.csv("/home/jovyan/work/data/*.csv", header=True, inferSchema=True)


df.printSchema()
df.show(5)


root
 |-- Date: string (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- Series: string (nullable = true)
 |-- Prev Close: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Last: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- VWAP: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Turnover: double (nullable = true)
 |-- Trades: double (nullable = true)
 |-- Deliverable Volume: double (nullable = true)
 |-- %Deliverble: double (nullable = true)

+----------+----------+------+----------+------+------+-----+-----+------+------+--------+-----------------+------+------------------+-----------+
|      Date|    Symbol|Series|Prev Close|  Open|  High|  Low| Last| Close|  VWAP|  Volume|         Turnover|Trades|Deliverable Volume|%Deliverble|
+----------+----------+------+----------+------+------+-----+-----+------+------+--------+-----------------+------+-----

In [10]:
from pyspark.sql.window import Window

# 1. Convert Date column to proper date type
df = df.withColumn("Date", F.to_date("Date", "yyyy-MM-dd"))

# 2. Define window per stock, ordered by Date
window_spec = Window.partitionBy("Symbol").orderBy("Date")

# 3. Previous day's close
df = df.withColumn("Prev_Close", F.lag("Close").over(window_spec))

# 4. Daily return = % change
df = df.withColumn(
    "Daily_Return",
    ((F.col("Close") - F.col("Prev_Close")) / F.col("Prev_Close")) * 100
)


df.select("Date", "Symbol", "Close", "Prev_Close", "Daily_Return").show(10)


+----------+----------+------+----------+-------------------+
|      Date|    Symbol| Close|Prev_Close|       Daily_Return|
+----------+----------+------+----------+-------------------+
|2008-05-26|BAJAJ-AUTO|604.75|      NULL|               NULL|
|2008-05-26|BAJAJ-AUTO|604.75|    604.75|                0.0|
|2008-05-27|BAJAJ-AUTO|593.15|    604.75|-1.9181479950392761|
|2008-05-27|BAJAJ-AUTO|593.15|    593.15|                0.0|
|2008-05-28|BAJAJ-AUTO|608.15|    593.15| 2.5288712804518254|
|2008-05-28|BAJAJ-AUTO|608.15|    608.15|                0.0|
|2008-05-29|BAJAJ-AUTO|599.45|    608.15|-1.4305681164186357|
|2008-05-29|BAJAJ-AUTO|599.45|    599.45|                0.0|
|2008-05-30|BAJAJ-AUTO| 571.7|    599.45|-4.6292434731837515|
|2008-05-30|BAJAJ-AUTO| 571.7|     571.7|                0.0|
+----------+----------+------+----------+-------------------+
only showing top 10 rows



In [11]:

df = df.withColumn("Shock_Day", F.when(F.col("Daily_Return") < -2, 1).otherwise(0))
window_spec = Window.partitionBy("Symbol").orderBy("Date")

df = df.withColumn("Next_Day_Return", F.lead("Daily_Return").over(window_spec))
df = df.withColumn("Bounce_Back", F.when(F.col("Next_Day_Return") > 0, 1).otherwise(0))


df.select("Date", "Symbol", "Close", "Daily_Return", "Shock_Day", "Next_Day_Return", "Bounce_Back").show(15)


+----------+----------+------+--------------------+---------+--------------------+-----------+
|      Date|    Symbol| Close|        Daily_Return|Shock_Day|     Next_Day_Return|Bounce_Back|
+----------+----------+------+--------------------+---------+--------------------+-----------+
|2008-05-26|BAJAJ-AUTO|604.75|                NULL|        0|                 0.0|          0|
|2008-05-26|BAJAJ-AUTO|604.75|                 0.0|        0| -1.9181479950392761|          0|
|2008-05-27|BAJAJ-AUTO|593.15| -1.9181479950392761|        0|                 0.0|          0|
|2008-05-27|BAJAJ-AUTO|593.15|                 0.0|        0|  2.5288712804518254|          1|
|2008-05-28|BAJAJ-AUTO|608.15|  2.5288712804518254|        0|                 0.0|          0|
|2008-05-28|BAJAJ-AUTO|608.15|                 0.0|        0| -1.4305681164186357|          0|
|2008-05-29|BAJAJ-AUTO|599.45| -1.4305681164186357|        0|                 0.0|          0|
|2008-05-29|BAJAJ-AUTO|599.45|                 0.0

In [13]:

df = df.withColumn("Shock_Day", F.when(F.col("Daily_Return") < -2, 1).otherwise(0))

window_spec = Window.partitionBy("Symbol").orderBy("Date")

df = df.withColumn("Next_Day_Return", F.lead("Daily_Return").over(window_spec))

df = df.withColumn("Bounce_Back", F.when(F.col("Next_Day_Return") > 0, 1).otherwise(0))

df.select("Date", "Symbol", "Close", "Daily_Return", "Shock_Day", "Next_Day_Return", "Bounce_Back").show(15)


+----------+----------+------+--------------------+---------+--------------------+-----------+
|      Date|    Symbol| Close|        Daily_Return|Shock_Day|     Next_Day_Return|Bounce_Back|
+----------+----------+------+--------------------+---------+--------------------+-----------+
|2008-05-26|BAJAJ-AUTO|604.75|                NULL|        0|                 0.0|          0|
|2008-05-26|BAJAJ-AUTO|604.75|                 0.0|        0| -1.9181479950392761|          0|
|2008-05-27|BAJAJ-AUTO|593.15| -1.9181479950392761|        0|                 0.0|          0|
|2008-05-27|BAJAJ-AUTO|593.15|                 0.0|        0|  2.5288712804518254|          1|
|2008-05-28|BAJAJ-AUTO|608.15|  2.5288712804518254|        0|                 0.0|          0|
|2008-05-28|BAJAJ-AUTO|608.15|                 0.0|        0| -1.4305681164186357|          0|
|2008-05-29|BAJAJ-AUTO|599.45| -1.4305681164186357|        0|                 0.0|          0|
|2008-05-29|BAJAJ-AUTO|599.45|                 0.0

In [14]:
# Only consider rows that ARE shock days and have a valid next day
shocks = df.filter((F.col("Shock_Day") == 1) & F.col("Next_Day_Return").isNotNull())

overall = shocks.agg(
    F.count("*").alias("num_shocks"),
    F.sum("Bounce_Back").alias("num_bounces"),
    (F.avg(F.col("Bounce_Back")) * 100).alias("bounce_rate_percent")
)

overall.show(truncate=False)


+----------+-----------+-------------------+
|num_shocks|num_bounces|bounce_rate_percent|
+----------+-----------+-------------------+
|32088     |0          |0.0                |
+----------+-----------+-------------------+



In [15]:
per_stock = shocks.groupBy("Symbol").agg(
    F.count("*").alias("num_shocks"),
    F.sum("Bounce_Back").alias("num_bounces"),
    (F.avg("Bounce_Back") * 100).alias("bounce_rate_percent")
).orderBy(F.col("bounce_rate_percent").desc())

print("Top 10 symbols by bounce rate:")
per_stock.show(10, truncate=False)

print("Bottom 10 symbols by bounce rate:")
per_stock.orderBy(F.col("bounce_rate_percent").asc()).show(10, truncate=False)


Top 10 symbols by bounce rate:
+----------+----------+-----------+-------------------+
|Symbol    |num_shocks|num_bounces|bounce_rate_percent|
+----------+----------+-----------+-------------------+
|ASIANPAINT|415       |0          |0.0                |
|AXISBANK  |552       |0          |0.0                |
|ADANIPORTS|324       |0          |0.0                |
|BAJAJFINSV|444       |0          |0.0                |
|BAJAJ-AUTO|291       |0          |0.0                |
|BRITANNIA |404       |0          |0.0                |
|BAJFINANCE|305       |0          |0.0                |
|GAIL      |679       |0          |0.0                |
|BAJAUTOFIN|444       |0          |0.0                |
|HCLTECH   |874       |0          |0.0                |
+----------+----------+-----------+-------------------+
only showing top 10 rows

Bottom 10 symbols by bounce rate:
+----------+----------+-----------+-------------------+
|Symbol    |num_shocks|num_bounces|bounce_rate_percent|
+----------+-

In [16]:
per_stock.coalesce(1).write.mode("overwrite").option("header", True).csv("/home/jovyan/work/output/per_stock_bounce")
overall.coalesce(1).write.mode("overwrite").option("header", True).csv("/home/jovyan/work/output/overall_bounce")


In [17]:

shocks = df.filter((F.col("Shock_Day") == 1) & F.col("Next_Day_Return").isNotNull())


overall = shocks.agg(
    F.count("*").alias("num_shocks"),
    F.sum("Bounce_Back").alias("num_bounces"),
    (F.avg("Bounce_Back") * 100).alias("bounce_rate_percent")
)

overall.show(truncate=False)


+----------+-----------+-------------------+
|num_shocks|num_bounces|bounce_rate_percent|
+----------+-----------+-------------------+
|32088     |0          |0.0                |
+----------+-----------+-------------------+



In [18]:
per_stock = shocks.groupBy("Symbol").agg(
    F.count("*").alias("num_shocks"),
    F.sum("Bounce_Back").alias("num_bounces"),
    (F.avg("Bounce_Back") * 100).alias("bounce_rate_percent")
).orderBy(F.col("bounce_rate_percent").desc())

print("Top 10 symbols by bounce rate:")
per_stock.show(10, truncate=False)

print("Bottom 10 symbols by bounce rate:")
per_stock.orderBy(F.col("bounce_rate_percent").asc()).show(10, truncate=False)


Top 10 symbols by bounce rate:
+----------+----------+-----------+-------------------+
|Symbol    |num_shocks|num_bounces|bounce_rate_percent|
+----------+----------+-----------+-------------------+
|ASIANPAINT|415       |0          |0.0                |
|AXISBANK  |552       |0          |0.0                |
|BAJAJ-AUTO|291       |0          |0.0                |
|BAJAJFINSV|444       |0          |0.0                |
|ADANIPORTS|324       |0          |0.0                |
|BRITANNIA |404       |0          |0.0                |
|CIPLA     |569       |0          |0.0                |
|GAIL      |679       |0          |0.0                |
|BAJAUTOFIN|444       |0          |0.0                |
|HCLTECH   |874       |0          |0.0                |
+----------+----------+-----------+-------------------+
only showing top 10 rows

Bottom 10 symbols by bounce rate:
+----------+----------+-----------+-------------------+
|Symbol    |num_shocks|num_bounces|bounce_rate_percent|
+----------+-

In [20]:
out_base = "/home/jovyan/work/output"

overall.coalesce(1).write.mode("overwrite").parquet(f"{out_base}/overall_bounce.parquet")
overall.toPandas().to_csv(f"{out_base}/overall_bounce.csv", index=False)
per_stock.coalesce(1).write.mode("overwrite").parquet(f"{out_base}/per_stock_bounce.parquet")
per_stock.toPandas().to_csv(f"{out_base}/per_stock_bounce.csv", index=False)

print("Results saved to /output as both CSV and Parquet ")


Results saved to /output as both CSV and Parquet 


In [21]:
import os
print(os.listdir(out_base))


['overall_bounce', 'overall_bounce.csv', 'overall_bounce.parquet', 'per_stock_bounce', 'per_stock_bounce.csv', 'per_stock_bounce.parquet']
