## 

## Import necessary functions

In [35]:
import time
from pyspark.sql.functions import col, max, year, month


StatementMeta(prdneudwcesyns, 36, 36, Finished, Available, Finished)

### Read CSV file

In [None]:
df = spark.read.load(
    'abfss://filesystem@flumenbaron.dfs.core.windows.net/synapse/workspaces/warehouse/all_stock_data.csv',
    format='csv',
    header=True,
    inferSchema=True
)

StatementMeta(prdneudwcesyns, 36, 37, Finished, Available, Finished)

## Count the rows in the table

In [37]:
df.count()

StatementMeta(prdneudwcesyns, 36, 38, Finished, Available, Finished)

34646258

## Aggregations without optimization

In [38]:
df2 = df.select("Date", "Ticker", "High")\
        .withColumn("Year", year(col("Date")))\
        .withColumn("Month", month(col("Date")))

start_time = time.time()

df = df.select("Date", "Ticker", "High").withColumn("Year", year(col("Date"))).withColumn("Month", month(col("Date")))

# Group by Ticker, Year, and Month, and calculate max High
max_high_df = df2.groupBy("Ticker", "Year", "Month").agg(max("High").alias("MaxHigh"))

# Show the result
max_high_df.show()

end_time = time.time()

# Calculate the elapsed time
elapsed_time = end_time - start_time
print(f"Time taken to run the query: {elapsed_time} seconds")

StatementMeta(prdneudwcesyns, 36, 39, Finished, Available, Finished)

+------+----+-----+--------------------+
|Ticker|Year|Month|             MaxHigh|
+------+----+-----+--------------------+
|   CVX|1962|    1|0.046914567869972224|
|    BA|1962|    2|  0.2123341160576403|
|   MMM|1962|    3|  0.5588248109436902|
|    GT|1962|    7|  1.5118772841961448|
|   CAT|1962|   10| 0.11590789252862449|
|   DIS|1963|    5| 0.06025179644915724|
|    PG|1963|    8|  0.2472103907752564|
|    KR|1963|   12|0.029898689640230982|
|   MMM|1963|   12|  0.5349178824265026|
|    KR|1964|   10|0.035673110712459544|
|    PG|1964|   12|  0.2616721498644771|
|    BA|1964|   12|  0.3089230842982749|
|    KR|1965|    3| 0.04350065218647663|
|    IP|1965|    7|  0.8789791717211558|
|   MCD|1967|   12|0.011546064415855302|
|   HON|1968|    4|  1.1158002614974976|
|   CNP|1969|    9| 0.31558356931370435|
|   DTE|1970|    2| 0.41256187978109926|
|   XOM|1970|    5|  0.1612526778363936|
|   MCD|1970|    6| 0.00438635054789685|
+------+----+-----+--------------------+
only showing top

## optimization implementation

In [None]:
df = spark.read.load('abfss://filesystem@flumenbaron.dfs.core.windows.net/synapse/workspaces/warehouse/all_stock_data.csv', format='csv', header = True)

# Extract year from Date and add it as a new column
df = df.withColumn("Year", year(col("Date")))

# Repartition the DataFrame by the Year column
partitioned_df = df.repartition("Year")

# Save the partitioned DataFrame to a Parquet file
partitioned_df.write.partitionBy("Year").parquet("abfss://filesystem@flumenbaron.dfs.core.windows.net/synapse/workspaces/warehouse/all_stock_data.parquet")


StatementMeta(prdneudwcesyns, 36, 47, Finished, Available, Finished)

In [None]:
# Define the path to the CSV file
df3 = spark.read.load(
    'abfss://filesystem@flumenbaron.dfs.core.windows.net/synapse/workspaces/warehouse/all_stock_data.csv',
    format='parquet',
    header=True,
    inferSchema=True
)

df3 = df3.select("Date", "Ticker", "High")\
        .withColumn("Year", year(col("Date")))\
        .withColumn("Month", month(col("Date")))
 

# Extract year and month from Date
df3 = df3.withColumn("Year", year(col("Date"))).withColumn("Month", month(col("Date")))

StatementMeta(prdneudwcesyns, 36, 50, Finished, Available, Finished)

In [50]:
# Extract year and month from Date
df_opt = df3.withColumn("Year", year(col("Date"))).withColumn("Month", month(col("Date")))

# Start the timer
start_time = time.time()

# Group by Ticker, Year, and Month, and calculate max High
max_high_df = df_opt.groupBy("Ticker", "Year", "Month").agg(max("High").alias("MaxHigh"))

# Trigger an action to execute the transformations
max_high_df.show()

# Stop the timer
end_time = time.time()

# Calculate the elapsed time
elapsed_time = end_time - start_time
print(f"Time taken to run the query: {elapsed_time} seconds")

StatementMeta(prdneudwcesyns, 36, 51, Finished, Available, Finished)

+------+----+-----+------------------+
|Ticker|Year|Month|           MaxHigh|
+------+----+-----+------------------+
|     A|1999|   11|30.757480725338883|
|     A|2000|    8|39.369579881907896|
|     A|2002|    7|15.052704100851255|
|     A|2002|    9| 9.891603317935147|
|     A|2003|    2| 8.427548509398077|
|     A|2003|    4| 9.983874566925232|
|     A|2004|    1|23.867802475774685|
|     A|2004|   11|15.932374147296567|
|     A|2005|    5| 14.85585587663277|
|     A|2005|    6|15.409495064471285|
|     A|2005|   12| 22.20690469593362|
|     A|2006|   12|23.301044006029702|
|     A|2007|    4| 23.37285799781407|
|     A|2007|    6|25.749318295548324|
|     A|2007|   12|24.965866627893188|
|     A|2008|   11|16.367530080331825|
|     A|2008|   12| 9.884502004315255|
|     A|2009|    3| 9.760454276178807|
|     A|2010|    3|22.687340376925135|
|     A|2010|    4| 24.43704347695698|
+------+----+-----+------------------+
only showing top 20 rows

Time taken to run the query: 15.860332