### Example Exploratory Notebook

Use this notebook to explore the data generated by the pipeline in your preferred programming language.

**Note**: This notebook is not executed as part of the pipeline.

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NSE_Daily_Data") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

In [0]:
import requests
import pandas as pd
from datetime import datetime

url = "https://www.nseindia.com/api/equity-stockIndices?index=NIFTY%2050"

headers = {
    "User-Agent": "Mozilla/5.0",
    "Accept": "application/json",
    "Referer": "https://www.nseindia.com"
}

session = requests.Session()
session.get("https://www.nseindia.com", headers=headers)  # create cookies

r = session.get(url, headers=headers)

print("Status:", r.status_code)

data = r.json()
df = pd.DataFrame(data["data"])
df["date"] = datetime.today().strftime('%Y-%m-%d')
display(df)

In [0]:
# # spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.NSE_SET")
# spark.sql("""
#     CREATE TABLE IF NOT EXISTS workspace.NSE_SET.NIFTY50_DAILY (
#         priority              BIGINT,
#         symbol                STRING,
#         identifier            STRING,
#         open                  DOUBLE,
#         dayHigh               DOUBLE,
#         dayLow                DOUBLE,
#         lastPrice             DOUBLE,
#         previousClose         DOUBLE,
#         change                DOUBLE,
#         pChange               DOUBLE,
#         ffmc                  DOUBLE,
#         yearHigh              DOUBLE,
#         yearLow               DOUBLE,
#         totalTradedVolume     BIGINT,
#         stockIndClosePrice    BIGINT,
#         totalTradedValue      DOUBLE,
#         lastUpdateTime        STRING,
#         nearWKH               DOUBLE,
#         nearWKL               DOUBLE,
#         perChange365d         DOUBLE,
#         perChange30d          DOUBLE,
#         date365dAgo           STRING,
#         date30dAgo            STRING,
#         chartTodayPath        STRING,
#         chart30dPath          STRING,
#         chart365dPath         STRING,
#         series                STRING,
#         meta                  STRING,
#         trade_date            date,
#         load_date             date
#     )
# """)

In [0]:
from pyspark.sql.functions import lit, current_date
from pyspark.sql.types import StringType

# Ensure 'meta' column is cast to StringType to avoid type inference error
if "meta" in df.columns:
    df["meta"] = df["meta"].astype(str)

spark_df = spark.createDataFrame(df)
spark_df = spark_df.withColumnRenamed("date", "trade_date").withColumn("load_date", current_date())

# Delete rows with the same load_date before inserting new data
# load_date_value = spark_df.select("load_date").first()
# if load_date_value is not None:
#     load_date_value = load_date_value["load_date"]
#     spark.sql(f"DELETE FROM workspace.NSE_SET.NIFTY50_DAILY WHERE load_date = DATE('{load_date_value}')")

spark_df.write.mode("append").insertInto("workspace.NSE_SET.NIFTY50_DAILY")