### Fetch stock data using yfinance.

In [26]:
import yfinance as yf

stocks = ["AAPL", "MSFT", "GOOGL", "TSLA"]

# Fetch data for each stock and save as CSV
for stock in stocks:
    data = yf.download(stock, start="2020-01-01", end="2023-12-31")
    data.reset_index(inplace=True)
    data.to_csv(f"{stock}.csv", index=False)

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


In [27]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum, mean, min, corr, max, to_date, lag, lit
from pyspark.sql.types import *


In [28]:
spark_application_name = "Stock Analysis"
spark = (SparkSession.builder.appName(spark_application_name).getOrCreate())

Read the CSV files into Spark DataFrames and combine them.

In [29]:
schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Adj Close", DoubleType(), True),
    StructField("Close", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Low", DoubleType(), True),
    StructField("Open", DoubleType(), True),
    StructField("Volume", DoubleType(), True),
    StructField("Stock", StringType(), False)
])

In [30]:
dfs = []
for stock in stocks:
    # Read CSV with schema enforcement
    df = spark.read.csv(f"{stock}.csv", header=True, schema=schema)

    # Add a Stock column with the symbol
    df = df.withColumn("Stock", lit(stock))
    dfs.append(df)

# Combine all stocks into a single DataFrame
combined_df = dfs[0]
for df in dfs[1:]:
    combined_df = combined_df.union(df)

# Print the schema and preview the DataFrame
combined_df.printSchema()
combined_df.show(5)


root
 |-- Date: date (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Stock: string (nullable = false)

+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+
|      Date|        Adj Close|            Close|             High|              Low|             Open|    Volume|Stock|
+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+
|      NULL|             NULL|             NULL|             NULL|             NULL|             NULL|      NULL| AAPL|
|2020-01-02|72.79603576660156| 75.0875015258789| 75.1500015258789|73.79750061035156|74.05999755859375|1.354804E8| AAPL|
|2020-01-03|72.08828735351562|74.35749816894531| 75.1449966430664|           74.125| 74.28749847412

In [31]:
# Check for null or invalid values
combined_df.select([col(c).isNull().alias(f"null_{c}") for c in combined_df.columns]).show()

# Confirm column data types
combined_df.printSchema()


+---------+--------------+----------+---------+--------+---------+-----------+----------+
|null_Date|null_Adj Close|null_Close|null_High|null_Low|null_Open|null_Volume|null_Stock|
+---------+--------------+----------+---------+--------+---------+-----------+----------+
|     true|          true|      true|     true|    true|     true|       true|     false|
|    false|         false|     false|    false|   false|    false|      false|     false|
|    false|         false|     false|    false|   false|    false|      false|     false|
|    false|         false|     false|    false|   false|    false|      false|     false|
|    false|         false|     false|    false|   false|    false|      false|     false|
|    false|         false|     false|    false|   false|    false|      false|     false|
|    false|         false|     false|    false|   false|    false|      false|     false|
|    false|         false|     false|    false|   false|    false|      false|     false|
|    false

### Pre-process the Data

Ensure the Date column is in the correct format.

In [32]:
combined_df = combined_df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

Drop rows with missing values.

In [33]:
combined_df = combined_df.dropna()

Add Calculated Columns

In [34]:
window_spec = Window.partitionBy("Stock").orderBy("Date")

combined_df = combined_df.withColumn(
    "Prev_Close", lag("Close").over(window_spec)
).withColumn(
    "Daily_Return", (col("Close") - col("Prev_Close")) / col("Prev_Close")
)

combined_df.show(5)



+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+-----------------+--------------------+
|      Date|        Adj Close|            Close|             High|              Low|             Open|    Volume|Stock|       Prev_Close|        Daily_Return|
+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+-----------------+--------------------+
|2020-01-02|72.79603576660156| 75.0875015258789| 75.1500015258789|73.79750061035156|74.05999755859375|1.354804E8| AAPL|             NULL|                NULL|
|2020-01-03|72.08828735351562|74.35749816894531| 75.1449966430664|           74.125| 74.2874984741211|1.463228E8| AAPL| 75.0875015258789|-0.00972203551987...|
|2020-01-06|72.66272735595703|74.94999694824219|74.98999786376953|          73.1875|73.44750213623047|1.183872E8| AAPL|74.35749816894531|0.007968245219206775|
|2020-01-07|72.32096862792969|74.5975036621093

Calculate a 5-day moving average for closing prices.

In [35]:
combined_df = combined_df.withColumn(
    "Moving_Avg_5", mean("Close").over(window_spec.rowsBetween(-4, 0))
)
combined_df.show(5)


+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+-----------------+--------------------+-----------------+
|      Date|        Adj Close|            Close|             High|              Low|             Open|    Volume|Stock|       Prev_Close|        Daily_Return|     Moving_Avg_5|
+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+-----------------+--------------------+-----------------+
|2020-01-02|72.79603576660156| 75.0875015258789| 75.1500015258789|73.79750061035156|74.05999755859375|1.354804E8| AAPL|             NULL|                NULL| 75.0875015258789|
|2020-01-03|72.08828735351562|74.35749816894531| 75.1449966430664|           74.125| 74.2874984741211|1.463228E8| AAPL| 75.0875015258789|-0.00972203551987...|74.72249984741211|
|2020-01-06|72.66272735595703|74.94999694824219|74.98999786376953|          73.1875|73.44750213623047|1.183872E8| A

## Aggregation and Analysis

Calculate total, average, minimum, and maximum closing prices for each stock.

In [36]:
summary_stats = combined_df.groupBy("Stock").agg(
    sum("Close").alias("Total_Close"),
    mean("Close").alias("Avg_Close"),
    min("Close").alias("Min_Close"),
    max("Close").alias("Max_Close")
)
summary_stats.show()


+-----+------------------+------------------+------------------+------------------+
|Stock|       Total_Close|         Avg_Close|         Min_Close|         Max_Close|
+-----+------------------+------------------+------------------+------------------+
| AAPL|141652.98002624512|140.80813123881225|56.092498779296875|198.11000061035156|
| MSFT|  264358.620010376|262.78192843973756| 135.4199981689453|382.70001220703125|
|GOOGL|108517.46757125854| 107.8702460946904| 52.70650100708008| 149.8385009765625|
| TSLA| 210381.1292438507|209.12637101774425| 24.08133316040039| 409.9700012207031|
+-----+------------------+------------------+------------------+------------------+



Find the stock with the highest daily return.

In [37]:
highest_return = combined_df.orderBy(col("Daily_Return").desc()).select("Stock", "Date", "Daily_Return").first()
print(f"Highest daily return: {highest_return}")

Highest daily return: Row(Stock='TSLA', Date=datetime.date(2020, 2, 3), Daily_Return=0.19894859376394902)


In [40]:
combined_df.write.csv("processed_stocks1.csv", header=True)

 Show the First and Last 40 Rows

In [None]:
combined_df.show(40)
combined_df.orderBy(col("Date").desc()).limit(40).show(40, truncate=False)

Get the Number of Observations

In [None]:
num_observations = combined_df.count()
print(f"Number of observations: {num_observations}")


Deduce the Period Between Data Points

In [None]:
from pyspark.sql.functions import datediff

def deduce_period(df):
    # Ensure the DataFrame is sorted by Date
    df = df.orderBy("Date")

    # Calculate difference between consecutive dates
    window_spec = Window.orderBy("Date")
    df = df.withColumn("Prev_Date", lag("Date").over(window_spec))
    df = df.withColumn("Date_Diff", datediff(col("Date"), col("Prev_Date")))

    # Determine the most common period
    period = df.groupBy("Date_Diff").count().orderBy(col("count").desc()).first()
    print(f"Most common period: {period['Date_Diff']} days")
    return period['Date_Diff']
deduce_period(combined_df)

Descriptive Statistics

In [None]:
from pyspark.sql.functions import mean, stddev

def descriptive_statistics(df):
    stats = df.describe()
    print("Descriptive Statistics:")
    stats.show()
descriptive_statistics(combined_df)

In [None]:
from pyspark.sql.functions import col, sum

def count_missing_values(df):
    missing_counts = df.select(
        *[(sum(col(c).isNull().cast("int")).alias(f"missing_{c}")) for c in df.columns]
    )
    print("Missing Values:")
    missing_counts.show()
count_missing_values(combined_df)

Correlation Between Values

In [42]:
from pyspark.sql.functions import col, sum, corr

def calculate_correlation_matrix(df):
    # Get numeric columns
    numeric_cols = [col_name for col_name, dtype in df.dtypes if dtype in ("int", "double")]

    if not numeric_cols:
        print("No numeric columns available for correlation.")
        return

    correlation_matrix = {}

    print("Correlation Matrix:")
    for i, col1 in enumerate(numeric_cols):
        for col2 in numeric_cols[i:]:  # Avoid duplicate pairs
            if col1 == col2:
                correlation_value = 1.0
            else:
                correlation_value = df.stat.corr(col1, col2)

            correlation_matrix[(col1, col2)] = correlation_value
            correlation_matrix[(col2, col1)] = correlation_value

    # Print the matrix
    print(f"{'':<15}", end="")
    for col in numeric_cols:
        print(f"{col:<15}", end="")
    print()
    for col1 in numeric_cols:
        print(f"{col1:<15}", end="")
        for col2 in numeric_cols:
            print(f"{correlation_matrix[(col1, col2)]:<15.2f}", end="")
        print()

calculate_correlation_matrix(combined_df)

Correlation Matrix:
               Adj Close      Close          High           Low            Open           Volume         Prev_Close     Daily_Return   Moving_Avg_5   
Adj Close      1.00           1.00           1.00           1.00           1.00           -0.28          1.00           0.00           1.00           
Close          1.00           1.00           1.00           1.00           1.00           -0.29          1.00           0.00           1.00           
High           1.00           1.00           1.00           1.00           1.00           -0.28          1.00           -0.02          1.00           
Low            1.00           1.00           1.00           1.00           1.00           -0.30          1.00           -0.02          1.00           
Open           1.00           1.00           1.00           1.00           1.00           -0.29          1.00           -0.04          1.00           
Volume         -0.28          -0.29          -0.28          -0.30         

Average of Opening and Closing Prices for Each Stock (Week, Month, Year)

In [23]:
def calculate_average_prices_for_stock(df, stock=None):
    from pyspark.sql.functions import year, month, weekofyear, avg

    df = df.withColumn("Year", year("Date")) \
           .withColumn("Month", month("Date")) \
           .withColumn("Week", weekofyear("Date"))

    if stock:
        df = df.filter(df["Stock"] == stock)

    avg_prices = df.groupBy("Stock", "Year", "Month", "Week") \
                   .agg(avg("Open").alias("Avg_Open"), avg("Close").alias("Avg_Close"))

    avg_prices.orderBy("Stock", "Year", "Month", "Week").show(50)  # Adjust number of rows displayed as needed
    return avg_prices

calculate_average_prices_for_stock(combined_df, stock="AAPL")
calculate_average_prices_for_stock(combined_df, stock="MSFT")
calculate_average_prices_for_stock(combined_df, stock="GOOGL")
calculate_average_prices_for_stock(combined_df, stock="TSLA")


+-----+----+-----+----+------------------+------------------+
|Stock|Year|Month|Week|          Avg_Open|         Avg_Close|
+-----+----+-----+----+------------------+------------------+
| AAPL|2020|    1|   1| 74.17374801635742| 74.72249984741211|
| AAPL|2020|    1|   2| 75.43150024414062| 76.06699981689454|
| AAPL|2020|    1|   3| 78.50250091552735| 78.74749908447265|
| AAPL|2020|    1|   4| 79.62125015258789| 79.48812675476074|
| AAPL|2020|    1|   5| 79.42900085449219| 79.21800079345704|
| AAPL|2020|    2|   6| 79.40349884033203| 79.71000061035156|
| AAPL|2020|    2|   7| 80.40899963378907| 80.90899963378907|
| AAPL|2020|    2|   8| 79.78812408447266| 79.74812316894531|
| AAPL|2020|    2|   9| 71.15500183105469| 71.28949737548828|
| AAPL|2020|    3|  10| 72.99550018310546| 73.64100036621093|
| AAPL|2020|    3|  11| 66.95550231933593| 67.65699996948243|
| AAPL|2020|    3|  12|61.189999389648435| 60.78800048828125|
| AAPL|2020|    3|  13| 60.72300033569336|61.147500610351564|
| AAPL|2

DataFrame[Stock: string, Year: int, Month: int, Week: int, Avg_Open: double, Avg_Close: double]

Daily and Monthly Changes in Stock Prices


In [24]:
combined_df.select("Stock").distinct().show()


+-----+
|Stock|
+-----+
| AAPL|
| MSFT|
|GOOGL|
| TSLA|
+-----+



In [25]:
def calculate_daily_return(df):
    df = df.withColumn("Daily_Return", (col("Close") - lag("Close").over(window_spec)) / lag("Close").over(window_spec))

    df.show(10)
    return df
calculate_daily_return(combined_df)

+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+-----------------+--------------------+-----------------+
|      Date|        Adj Close|            Close|             High|              Low|             Open|    Volume|Stock|       Prev_Close|        Daily_Return|     Moving_Avg_5|
+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+-----------------+--------------------+-----------------+
|2020-01-02|72.79603576660156| 75.0875015258789| 75.1500015258789|73.79750061035156|74.05999755859375|1.354804E8| AAPL|             NULL|                NULL| 75.0875015258789|
|2020-01-03|72.08828735351562|74.35749816894531| 75.1449966430664|           74.125| 74.2874984741211|1.463228E8| AAPL| 75.0875015258789|-0.00972203551987...|74.72249984741211|
|2020-01-06|72.66272735595703|74.94999694824219|74.98999786376953|          73.1875|73.44750213623047|1.183872E8| A

DataFrame[Date: date, Adj Close: double, Close: double, High: double, Low: double, Open: double, Volume: double, Stock: string, Prev_Close: double, Daily_Return: double, Moving_Avg_5: double]

To find the stocks with the highest daily return we do the following:

In [43]:
from pyspark.sql.functions import col, max

def get_stock_with_highest_daily_return(df):
    # Find the maximum daily return for each stock
    stock_max_return = df.groupBy("Stock").agg(max("Daily_Return").alias("Max_Daily_Return"))

    # Find the stock with the overall highest daily return
    highest_return = stock_max_return.orderBy(col("Max_Daily_Return").desc()).limit(1)

    # Show the results
    stock_max_return.show()  # Max daily return per stock
    highest_return.show()    # Stock with the highest daily return overall

    return stock_max_return, highest_return

stock_max_return, highest_return = get_stock_with_highest_daily_return(combined_df)


+-----+-------------------+
|Stock|   Max_Daily_Return|
+-----+-------------------+
| AAPL| 0.1198082665473158|
|GOOGL|  0.092411527026541|
| MSFT|0.14216888119914378|
| TSLA|0.19894859376394902|
+-----+-------------------+

+-----+-------------------+
|Stock|   Max_Daily_Return|
+-----+-------------------+
| TSLA|0.19894859376394902|
+-----+-------------------+



Calculate the average daily return for different periods like week, month, and year

In [44]:
from pyspark.sql.functions import year, month, weekofyear, avg

def calculate_average_daily_return(df):

    # Extract year, month, and week information
    df = df.withColumn("Year", year("Date")) \
           .withColumn("Month", month("Date")) \
           .withColumn("Week", weekofyear("Date"))

    # Calculate average daily return per week
    weekly_avg_return = df.groupBy("Stock", "Year", "Week") \
                          .agg(avg("Daily_Return").alias("Avg_Daily_Return_Weekly"))

    # Calculate average daily return per month
    monthly_avg_return = df.groupBy("Stock", "Year", "Month") \
                           .agg(avg("Daily_Return").alias("Avg_Daily_Return_Monthly"))

    # Calculate average daily return per year
    yearly_avg_return = df.groupBy("Stock", "Year") \
                          .agg(avg("Daily_Return").alias("Avg_Daily_Return_Yearly"))

    # Display the results
    print("Weekly Average Daily Return:")
    weekly_avg_return.orderBy("Stock", "Year", "Week").show(10)

    print("Monthly Average Daily Return:")
    monthly_avg_return.orderBy("Stock", "Year", "Month").show(10)

    print("Yearly Average Daily Return:")
    yearly_avg_return.orderBy("Stock", "Year").show(10)

    return weekly_avg_return, monthly_avg_return, yearly_avg_return

weekly_avg, monthly_avg, yearly_avg = calculate_average_daily_return(combined_df)


Weekly Average Daily Return:
+-----+----+----+-----------------------+
|Stock|Year|Week|Avg_Daily_Return_Weekly|
+-----+----+----+-----------------------+
| AAPL|2020|   1|   -0.00972203551987...|
| AAPL|2020|   2|    0.00857060102070845|
| AAPL|2020|   3|   0.005434609657911612|
| AAPL|2020|   4|   -3.18374412677957...|
| AAPL|2020|   5|   -0.00519428282555...|
| AAPL|2020|   6|   0.006838098524189764|
| AAPL|2020|   7|   0.003117847997880882|
| AAPL|2020|   8|   -0.00918045046484...|
| AAPL|2020|   9|   -0.02629241490788...|
| AAPL|2020|  10|   0.012401776797485784|
+-----+----+----+-----------------------+
only showing top 10 rows

Monthly Average Daily Return:
+-----+----+-----+------------------------+
|Stock|Year|Month|Avg_Daily_Return_Monthly|
+-----+----+-----+------------------------+
| AAPL|2020|    1|    0.001652955304736...|
| AAPL|2020|    2|    -0.00623179704149...|
| AAPL|2020|    3|    -0.00126147907067...|
| AAPL|2020|    4|    0.007338761990882674|
| AAPL|2020|    5| 

Yearly Returns: The yearly average daily returns show that most stocks maintained relatively stable performance, with values typically close to zero. However, certain years (AAPL in 2022 with -9.92) exhibit significant negative performance, potentially indicating a challenging year for that stock.

Monthly and Weekly Volatility: The monthly and weekly average daily returns for AAPL show fluctuations, with some months (May 2020 and July 2020) having positive returns, while others (February 2020 and September 2020) show negative returns. These variations reflect the inherent volatility of stock prices over shorter periods.



  Calculate the moving average for a specified column and period.
    
    :param df: PySpark DataFrame
    :param column_name: The name of the column to calculate the moving average for
    :param num_points: The number of periods to consider for the moving average
    :return: DataFrame with a new column containing the moving average
  

In [45]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col

def calculate_moving_average(df, column_name, num_points):

    # Define a window for calculating the moving average
    window_spec = Window.partitionBy("Stock").orderBy("Date").rowsBetween(-num_points + 1, 0)

    # Calculate the moving average
    moving_avg_column = f"Moving_Avg_{column_name}_{num_points}"
    df = df.withColumn(moving_avg_column, avg(col(column_name)).over(window_spec))

    # Show a sample of the resulting DataFrame
    df.show(10)

    return df

# Calculate the 5-period moving average for the 'Open' column
combined_df_with_moving_avg = calculate_moving_average(combined_df, "Open", 5)


+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+-----------------+--------------------+-----------------+-----------------+
|      Date|        Adj Close|            Close|             High|              Low|             Open|    Volume|Stock|       Prev_Close|        Daily_Return|     Moving_Avg_5|Moving_Avg_Open_5|
+----------+-----------------+-----------------+-----------------+-----------------+-----------------+----------+-----+-----------------+--------------------+-----------------+-----------------+
|2020-01-02|72.79603576660156| 75.0875015258789| 75.1500015258789|73.79750061035156|74.05999755859375|1.354804E8| AAPL|             NULL|                NULL| 75.0875015258789|74.05999755859375|
|2020-01-03|72.08828735351562|74.35749816894531| 75.1449966430664|           74.125| 74.2874984741211|1.463228E8| AAPL| 75.0875015258789|-0.00972203551987...|74.72249984741211|74.17374801635742|
|2020-01-06|72.6627273559

Analyze the Results Across All Stocks:

In [46]:
from pyspark.sql.functions import avg

avg_moving_avg = combined_df_with_moving_avg.groupBy("Stock").agg(
    avg("Moving_Avg_Open_5").alias("Avg_Moving_Avg_Open_5")
)
avg_moving_avg.show()

+-----+---------------------+
|Stock|Avg_Moving_Avg_Open_5|
+-----+---------------------+
| AAPL|   140.43777437216406|
|GOOGL|   107.65696952572068|
| MSFT|   262.23848399446064|
| TSLA|   208.74314159342094|
+-----+---------------------+



We can see that MSFT has the highest average 5-period moving average of opening prices (262.24), indicating consistently higher opening prices compared to other stocks.
GOOGL has the lowest average moving average (107.65), suggesting it has lower overall opening prices in comparison.
Finally, TSLA and AAPL are in the middle range, with TSLA showing slightly higher average opening prices than AAPL.
This indicates varying price levels and trends among the stocks over time.

  Calculate the correlation between two stocks based on a specified column.
    
    :param df: PySpark DataFrame containing stock data
    :param stock1: The first stock symbol (e.g., "AAPL")
    :param stock2: The second stock symbol (e.g., "MSFT")
    :param column: The column to calculate correlation on
    :return: Correlation value between the two stocks
    

In [47]:

def calculate_correlation_between_stocks(df, stock1, stock2, column="Close"):

    # Filter data for the two stocks
    stock1_data = df.filter(col("Stock") == stock1).select("Date", col(column).alias(f"{column}_{stock1}"))
    stock2_data = df.filter(col("Stock") == stock2).select("Date", col(column).alias(f"{column}_{stock2}"))

    # Join the two stocks' data on Date
    joined_data = stock1_data.join(stock2_data, on="Date", how="inner")

    # Calculate the correlation
    correlation = joined_data.stat.corr(f"{column}_{stock1}", f"{column}_{stock2}")

    print(f"Correlation between {stock1} and {stock2} based on {column}: {correlation}")
    return correlation

calculate_correlation_between_stocks(combined_df, "AAPL", "MSFT", column="Close")
calculate_correlation_between_stocks(combined_df, "AAPL", "GOOGL", column="Close")
calculate_correlation_between_stocks(combined_df, "AAPL", "TSLA", column="Close")
calculate_correlation_between_stocks(combined_df, "MSFT", "GOOGL", column="Close")
calculate_correlation_between_stocks(combined_df, "MSFT", "TSLA", column="Close")
calculate_correlation_between_stocks(combined_df, "GOOGL", "TSLA", column="Close")


Correlation between AAPL and MSFT based on Close: 0.9344927047414922
Correlation between AAPL and GOOGL based on Close: 0.8321030784933668
Correlation between AAPL and TSLA based on Close: 0.7982315983854091
Correlation between MSFT and GOOGL based on Close: 0.9105887428487369
Correlation between MSFT and TSLA based on Close: 0.7592030550662611
Correlation between GOOGL and TSLA based on Close: 0.8465239627012778


0.8465239627012778

We notice that strongest correlation: AAPL and MSFT (0.93) show the highest correlation, reflecting similar market trends in the tech sector.

Moderate correlation: AAPL and GOOGL (0.83) and GOOGL and TSLA (0.84) indicate moderately similar price movements.

Weakest correlation: MSFT and TSLA (0.76) and AAPL and TSLA (0.79) suggest TSLA is less influenced by tech market trends.

  Calculate return rate from grouped data that includes first and last close prices.
    
    :param df_grouped: PySpark DataFrame with grouped data containing first and last close columns
    :param first_column: Name of the column with the first close price
    :param last_column: Name of the column with the last close price
    :param return_column: Name of the output column for return rate
    :return: DataFrame with calculated return rate


In [54]:
from pyspark.sql.functions import first, last, col

def calculate_return_rate(df):

    # Add Year, Month, and Week columns
    df = df.withColumn("Year", year("Date")) \
           .withColumn("Month", month("Date")) \
           .withColumn("Week", weekofyear("Date"))

    # Calculate return rate per week
    weekly_return = df.groupBy("Stock", "Year", "Week").agg(
        first("Close").alias("First_Close"),
        last("Close").alias("Last_Close")
    ).withColumn("Weekly_Return_Rate", ((col("Last_Close") - col("First_Close")) / col("First_Close")) * 100)

    # Calculate return rate per month
    monthly_return = df.groupBy("Stock", "Year", "Month").agg(
        first("Close").alias("First_Close"),
        last("Close").alias("Last_Close")
    ).withColumn("Monthly_Return_Rate", ((col("Last_Close") - col("First_Close")) / col("First_Close")) * 100)

    # Calculate return rate per year
    yearly_return = df.groupBy("Stock", "Year").agg(
        first("Close").alias("First_Close"),
        last("Close").alias("Last_Close")
    ).withColumn("Yearly_Return_Rate", ((col("Last_Close") - col("First_Close")) / col("First_Close")) * 100)

    # Display the results
    print("Weekly Return Rate:")
    weekly_return.orderBy("Stock", "Year", "Week").show(10)

    print("Monthly Return Rate:")
    monthly_return.orderBy("Stock", "Year", "Month").show(10)

    print("Yearly Return Rate:")
    yearly_return.orderBy("Stock", "Year").show(10)

    return weekly_return, monthly_return, yearly_return


In [55]:
weekly_return, monthly_return, yearly_return = calculate_return_rate(combined_df)

Weekly Return Rate:
+-----+----+----+-----------------+-----------------+-------------------+
|Stock|Year|Week|      First_Close|       Last_Close| Weekly_Return_Rate|
+-----+----+----+-----------------+-----------------+-------------------+
| AAPL|2020|   1| 75.0875015258789|74.35749816894531|-0.9722035519879405|
| AAPL|2020|   2|74.94999694824219| 77.5824966430664|  3.512341296881079|
| AAPL|2020|   3|79.23999786376953|79.68250274658203|  0.558436262925272|
| AAPL|2020|   4|79.14250183105469|79.57749938964844| 0.5496383719614251|
| AAPL|2020|   5|77.23750305175781|77.37750244140625| 0.1812583060260534|
| AAPL|2020|   6|77.16500091552734|80.00749969482422|   3.68366324832752|
| AAPL|2020|   7|80.38749694824219|81.23750305175781|  1.057385956503789|
| AAPL|2020|   8|            79.75|78.26249694824219|-1.8652075884110502|
| AAPL|2020|   9|74.54499816894531|68.33999633789062| -8.323833903640269|
| AAPL|2020|  10|74.70249938964844|72.25749969482422|-3.2729824501200335|
+-----+----+----+-

The weekly returns for AAPL in 2020 were generally stable, with minor fluctuations. The highest weekly return was +3.68% in week 6, while the largest drop was -8.32% in week 9, indicating some short-term volatility. In contrast, the monthly returns exhibited greater volatility, ranging from a sharp decline of -14.89% in March 2020 to a strong recovery of +21.95% in April 2020. This highlights the significant market recovery following initial losses. Overall, early 2020 was marked by steep declines and recoveries, while later months, such as October 2020, showed declining performance, reflecting a potential market slowdown towards the end of the year.


  Calculate the stock with the best return rate for a specific month or year.
    
    :param df: PySpark DataFrame containing stock data with "Date", "Stock", and "Close".
    :param start_date: The start date (e.g., "2020-01-01").
    :param period: The period to analyze ("month" or "year").
    :return: The stock with the best return rate for the specified period.
   

In [57]:
from pyspark.sql.functions import col, first, last, year, month

def best_return_rate(df, start_date, period="month"):

    # Extract Year and Month from the Date column
    df = df.withColumn("Year", year("Date")).withColumn("Month", month("Date"))

    # Parse the start date
    start_year, start_month, _ = map(int, start_date.split("-"))

    # Filter data based on the specified period
    if period == "month":
        filtered_df = df.filter((col("Year") == start_year) & (col("Month") == start_month))
        group_by_cols = ["Stock", "Year", "Month"]
    elif period == "year":
        filtered_df = df.filter(col("Year") == start_year)
        group_by_cols = ["Stock", "Year"]
    else:
        raise ValueError("Invalid period. Choose 'month' or 'year'.")

    # Group data and calculate return rate
    return_rate_df = filtered_df.groupBy(group_by_cols).agg(
        first("Close").alias("First_Close"),
        last("Close").alias("Last_Close")
    ).withColumn("Return_Rate", ((col("Last_Close") - col("First_Close")) / col("First_Close")) * 100)

    # Find the stock with the best return rate
    best_stock = return_rate_df.orderBy(col("Return_Rate").desc()).limit(1)

    # Show the result
    best_stock.show()

    return best_stock


In [58]:
# Find the stock with the best return rate for January 2020
best_stock = best_return_rate(combined_df, "2020-01-01", period="month")

# Find the stock with the best return rate for 2020
best_stock_year = best_return_rate(combined_df, "2020-01-01", period="year")


+-----+----+-----+-----------------+------------------+-----------------+
|Stock|Year|Month|      First_Close|        Last_Close|      Return_Rate|
+-----+----+-----+-----------------+------------------+-----------------+
| TSLA|2020|    1|28.68400001525879|43.371334075927734|51.20392571766785|
+-----+----+-----+-----------------+------------------+-----------------+

+-----+----+-----------------+------------------+-----------------+
|Stock|Year|      First_Close|        Last_Close|      Return_Rate|
+-----+----+-----------------+------------------+-----------------+
| TSLA|2020|28.68400001525879|235.22332763671875|720.0506467423962|
+-----+----+-----------------+------------------+-----------------+



For January 2020, Tesla achieved the highest return rate of 51.20%, indicating significant growth in its stock price during the month. For the entire year of 2020, Tesla maintained its position as the top-performing stock with a staggering annual return rate of 720.05%, highlighting its remarkable performance and market demand throughout the year.