In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark.sql.functions import input_file_name, lit
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.functions import col
import os

spark = SparkSession.builder.appName("ReadMultipleCSVFiles").getOrCreate()

In [0]:
# Define the schema for the DataFrame
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Open", FloatType(), True),
    StructField("High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("Adj Close", FloatType(), True),
    StructField("Volume", IntegerType(), True)
])


In [0]:
# Define the folder path
folder_path_etf = "dbfs:/FileStore/etfs"
folder_path_stocks = "dbfs:/FileStore/stocks"

def dataframe(folder_path):
# List all CSV files in the folder using %fs
    csv_files = [f.path for f in dbutils.fs.ls(folder_path) if f.path.endswith(".csv")]

    # Read the first CSV file into a DataFrame with header
    df = spark.read.csv(csv_files[0], header=True, inferSchema=True)

    # Add the filename as a new column to the DataFrame, with the ".csv" extension removed
    df = df.withColumn("Symbol", lit(os.path.splitext(os.path.basename(csv_files[0]))[0]))

    # Loop through the remaining CSV files, read them into DataFrames, and append them to the original DataFrame
    for csv_file in csv_files[1:]:
        new_df = spark.read.csv(csv_file, header=True, inferSchema=True)
        # Add the filename as a new column to the DataFrame, with the ".csv" extension removed
        new_df = new_df.withColumn("Symbol", lit(os.path.splitext(os.path.basename(csv_file))[0]))
        df = df.union(new_df)
    df = df.select("Symbol", "Date", "Open", "High", "Low", "Close", "Adj Close", "Volume")
    # Show the resulting DataFrame
    return df
etf_df = dataframe(folder_path_etf)
stocks_df = dataframe(folder_path_stocks)
# display(etf_df)
# display(stocks_df)

In [0]:
combined_etf_stocks = stocks_df.union(etf_df)
combined_etf_stocks.display()

Symbol,Date,Open,High,Low,Close,Adj Close,Volume
A,1999-11-18,32.54649353027344,35.765380859375,28.612302780151367,31.473533630371094,27.06866455078125,62546300.0
A,1999-11-19,30.713520050048828,30.75822639465332,28.47818374633789,28.880542755126957,24.83857727050781,15234100.0
A,1999-11-22,29.551143646240234,31.473533630371094,28.65700912475586,31.473533630371094,27.06866455078125,6577800.0
A,1999-11-23,30.40057182312012,31.205293655395508,28.612302780151367,28.612302780151367,24.607879638671875,5975600.0
A,1999-11-24,28.701717376708984,29.998210906982425,28.612302780151367,29.372318267822266,25.261524200439453,4843200.0
A,1999-11-26,29.23819732666016,29.685264587402344,29.14878463745117,29.46173095703125,25.338428497314453,1729400.0
A,1999-11-29,29.32761001586914,30.355865478515625,29.014663696289062,30.13233184814453,25.91516876220703,4074700.0
A,1999-11-30,30.042919158935547,30.713520050048828,29.28290367126465,30.177038192749023,25.9536190032959,4310000.0
A,1999-12-01,30.177038192749023,31.0711727142334,29.95350456237793,30.713520050048828,26.41501235961914,2957300.0
A,1999-12-02,31.29470634460449,32.1888427734375,30.8923454284668,31.56294631958008,27.14556312561035,3069800.0


In [0]:
maindf = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/main/symbols_valid_meta.csv")

# Show the DataFrame


In [0]:
joined_df = combined_etf_stocks.join(maindf, on="Symbol", how="inner")
final_df = joined_df.select(col("Symbol"), col("Security Name"), col("Date"),col("Open"),col("High"),col("Low"),col("Close"),col("Adj Close"),col("Volume"),col("ETF"))
# final_df.display()

In [0]:
output_path = "dbfs:/FileStore/combined_csv/mid_output.csv"

# write the DataFrame to a CSV file
net_df.write.format("csv") \
  .option("header", "true") \
  .mode("overwrite") \
  .option("delimiter", ",") \
  .save(output_path)

In [0]:
final_df.createOrReplaceTempView("final_dataset")

In [0]:
q1_result= spark.sql ("select *,AVG(Volume) OVER (partition by Symbol ORDER BY CAST(Date AS timestamp)  RANGE BETWEEN INTERVAL 31 DAYS PRECEDING AND INTERVAL 1 DAY PRECEDING) as vol_moving_avg  from final_dataset")
final_df = q1_result

In [0]:
final_df.createOrReplaceTempView("final_dataset2")

In [0]:
q2_result = spark.sql("SELECT *,  percentile_approx(`Adj Close`, 0.5) OVER (partition by Symbol ORDER BY CAST(Date AS timestamp)  RANGE BETWEEN INTERVAL 31 DAYS PRECEDING AND INTERVAL 1 DAY PRECEDING ) AS adj_close_rolling_med from final_dataset2")
net_df= q2_result

In [0]:
output_path = "dbfs:/FileStore/combined_csv/output.csv"

# write the DataFrame to a CSV file
net_df.write.format("csv") \
  .option("header", "true") \
  .mode("overwrite") \
  .option("delimiter", ",") \
  .save(output_path)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1509883574619226>:4[0m
[1;32m      1[0m output_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124mdbfs:/FileStore/combined_csv/output.csv[39m[38;5;124m"[39m
[1;32m      3[0m [38;5;66;03m# write the DataFrame to a CSV file[39;00m
[0;32m----> 4[0m net_df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m) \
[1;32m      5[0m   [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m) \
[1;32m      6[0m   [38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m) \
[1;32m      7[0m   [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mdelimiter[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m,[39m[38;5;124m"[39m) \
[1;32m   

In [0]:
net_df.display()