# Saving data for top 10 Indian Companies in CSV Format

## This code fetches historical daily stock data for Top 10 Indian companies (By Market Cap) from the API within a specified date range (2020-01-01 to 2024-5-31).
## It processes this data into Spark DataFrames and saves them as CSV files in a specified directory. The key steps include making API requests, filtering data by date, creating DataFrames, and writing these DataFrames as CSVs to disk.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import datetime
import json
import urllib.request

spark = SparkSession.builder.appName("AlphaVantageAPI").getOrCreate()

api_key = 'M38UN0AZ1TT9LTLT'

tickers = ['RELIANCE.BSE', 'TCS.BSE', 'HDFCBANK.BSE', 'INFY.BSE', 'ICICIBANK.BSE', 'HINDUNILVR.BSE', 'KOTAKBANK.BSE', 'ITC.BSE', 'BAJFINANCE.BSE', 'BHARTIARTL.BSE']

start_date = datetime.datetime(2020, 1, 1)
end_date = datetime.datetime(2024, 5, 31)

def fetch_data(ticker):
    url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={ticker}&outputsize=full&apikey={api_key}'
    with urllib.request.urlopen(url) as response:
        data = json.loads(response.read().decode())
        
    if 'Time Series (Daily)' not in data:
        print(f"No data found for {ticker}")
        return None
    
    time_series = data['Time Series (Daily)']
    rows = []
    
    for date, metrics in time_series.items():
        dt = datetime.datetime.strptime(date, '%Y-%m-%d')
        if start_date <= dt <= end_date:
            rows.append((date, metrics['1. open'], metrics['2. high'],metrics['3. low'],metrics['4. close'],metrics['5. volume'],ticker ))
            
    return rows

def create_dataframe(data, schema):
    return spark.createDataFrame(data, schema=schema)

schema = ['Date', 'Company', 'Open', 'Close', 'High', 'Low', 'Volume']

storage_path = 'Files/Raw_Historical'

for ticker in tickers:
    print(f"Fetching data for {ticker}")
    data = fetch_data(ticker)
    if data:
        df = create_dataframe(data, schema)
        df.write.mode('overwrite').csv(f'{storage_path}/{ticker}.csv')
    else:
        print(f"No data to write for {ticker}")

# Accessing the CSVs and Setting the schema and saving them in Parquet format

## this code converts the historical stock data from CSV format to Parquet format for each company listed in the tickers array. 
## Parquet is a columnar storage file format which is more efficient for querying and storage compared to CSV.

In [3]:
from pyspark.sql.types import *

tickers = ['RELIANCE.BSE', 'TCS.BSE', 'HDFCBANK.BSE', 'INFY.BSE', 'ICICIBANK.BSE', 'HINDUNILVR.BSE', 'KOTAKBANK.BSE', 'ITC.BSE', 'BAJFINANCE.BSE', 'BHARTIARTL.BSE']

Schema = StructType([
    StructField("Date", DateType()),
    StructField("Open", FloatType()),
    StructField("High", FloatType()),
    StructField("Low", FloatType()),
    StructField("Close", FloatType()),
    StructField("Volume", IntegerType()),
    StructField("Company", StringType())
    ])
for ticker in tickers:
    
    df = spark.read.format("csv").option("header", "true").schema(Schema).load(f'Files/Raw_Historical/{ticker}.csv')
    
    df.write.mode('overwrite').parquet(f'Files/Raw_Historical/{ticker}.parquet')

StatementMeta(, 60466069-2011-4b34-a4b9-82e957d19193, 5, Finished, Available, Finished)

# Now Creating a Delta Table form all these parquet files named Top10Companies which is Partitioned by Company Names

## The code reads all Parquet files from the specified directory into a Spark DataFrame which is partitioned by the "Company" column to optimize query performance.
## The partitioned DataFrame is written to a Delta table named "top10companies", overwriting any existing data in this table.

In [1]:
df1 = spark.read.parquet('Files/Raw_Historical_parquet/*.parquet')
df1.write.partitionBy("Company").format("delta").mode("overwrite").saveAsTable("top10companies")

StatementMeta(, 5e9fe5a8-fe64-4da7-a006-bcd91ae93fc2, 3, Finished, Available, Finished)

# Querying data from delta table

In [5]:
df = spark.sql("SELECT Company, COUNT(Date) FROM Atlys_Lakehouse.top10companies GROUP BY Company LIMIT 10")
display(df)

StatementMeta(, 5e9fe5a8-fe64-4da7-a006-bcd91ae93fc2, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f2c079e9-ea1b-43d2-ba26-09b47a00830f)

# Optimizing delta table using ZORDER by Date

# WHY ZORDER and NOT PARTITIONING?

## Z-Ordering is a technique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake on Databricks data-skipping algorithms to dramatically reduce the amount of data that needs to be read. Syntax for Z-ordering can be found here.

## If you expect a column to be commonly used in query predicates and if that column has high cardinality (that is, a large number of distinct values) which might make it ineffective for PARTITIONing the table by, then use ZORDER BY instead (ex:- a table containing companies, dates where you might want to partition by company and z-order by date assuming that table collects data for several years)

In [None]:
%%sql
OPTIMIZE top10companies ZORDER BY (Date);

StatementMeta(, 07d067e9-15f2-479e-b501-a8079b2cd114, 4, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 2 fields>

# Company Wise Daily Variation of Prices

In [11]:
%%sql
SELECT
  Date,
  Company,
  Open,
  Close,
  (High - Low) AS DailyPriceVariation
FROM
  top10companies
WHERE
  Date BETWEEN '2020-01-01' AND '2024-05-31';


StatementMeta(, 990cb260-3ae1-4da3-a393-35e0365a7b61, 13, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 5 fields>

# Company Wise Daily Volume Change

In [None]:
%%sql
SELECT
  Company,
  Date,
  Volume,
  LAG(Volume) OVER (PARTITION BY Company ORDER BY Date) AS PreviousVolume,
  Volume - LAG(Volume) OVER (PARTITION BY Company ORDER BY Date) AS VolumeChange
FROM
  top10companies
WHERE
  Date BETWEEN '2020-01-01' AND '2024-05-31';


StatementMeta(, 07d067e9-15f2-479e-b501-a8079b2cd114, 6, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 5 fields>

# Calculate the median daily variation using an APPROXIMATE_MEDIAN (if supported) or a similar function

In [18]:
%%sql
SELECT
  Date,
  Company,
  approx_percentile(High - Low, 0.5) AS MedianDailyVariation
FROM
  top10companies
GROUP BY
  Date, Company
ORDER BY Date DESC, Company;


StatementMeta(, 990cb260-3ae1-4da3-a393-35e0365a7b61, 20, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 3 fields>

# Saving the query data as a Delta Table to further increase performance rather than a view BECAUSE the following queries have to run on a regular interval:
##    1. Company Wise Daily Variation of Prices
##    2. Company Wise Daily Volume Change
##    3. Median Daily Variation

# for Company Wise Daily Variation of Prices the table name is 'Company_Wise_Daily_Variation'
## The purpose of this code is to create a new Delta table (Company_Wise_Daily_Variation) that contains the median daily variation of stock prices for each company on each date. This table can be used for further analysis, querying, and reporting, providing insights into the typical daily price changes for each company over time.

In [14]:
data = spark.sql(
"""
SELECT Company, 
    CAST(Date AS DATE), 
    Close, 
    Open, 
    Low, 
    High, 
    (High - Low) AS DailyVariation 
FROM top10companies
"""
)

data.write.format("delta").mode("overwrite").saveAsTable('Company_Wise_Daily_Variation')

StatementMeta(, 990cb260-3ae1-4da3-a393-35e0365a7b61, 16, Finished, Available, Finished)

# for Company Wise Daily Volume Change the table name is 'Company_Wise_Daily_Volume_Change'
## The purpose of this code is to create a new Delta table (Company_Wise_Daily_Volume_Change) that contains the daily volume and the change in volume for each company over time within the specified date range. This new table can be used for further analysis, querying, and reporting, providing insights into the daily trading volume changes for each company.

In [3]:
data = spark.sql(
"""
SELECT Company, 
        Date, 
        Volume, 
        LAG(Volume) OVER (PARTITION BY Company ORDER BY Date) AS PreviousVolume,  
        Volume - LAG(Volume) OVER (PARTITION BY Company ORDER BY Date) AS VolumeChange 
FROM top10companies 
WHERE Date BETWEEN '2020-01-01' AND '2024-05-31';
"""
)

data.write.format("delta").mode("overwrite").saveAsTable('Company_Wise_Daily_Volume_Change')

StatementMeta(, 990cb260-3ae1-4da3-a393-35e0365a7b61, 5, Finished, Available, Finished)

# for Median Daily Variation the table name is 'Median_Daily_Variation'
## The purpose of this code is to create a new Delta table (Median_Daily_Variation) that contains the median daily variation of stock prices for each company on each date.This new table can be used for further analysis, querying, and reporting, providing insights into the typical daily price changes for each company over time.

In [20]:
data = spark.sql(
"""
SELECT
  Date,
  Company,
  approx_percentile(High - Low, 0.5) AS MedianDailyVariation
FROM
  top10companies
GROUP BY
  Date, Company
ORDER BY Date DESC, Company;
"""
)

data.write.format("delta").mode("overwrite").saveAsTable('Median_Daily_Variation')

StatementMeta(, 990cb260-3ae1-4da3-a393-35e0365a7b61, 22, Finished, Available, Finished)