In [1]:
!pip install findspark




[notice] A new release of pip available: 22.3.1 -> 23.3.2
[notice] To update, run: python.exe -m pip install --upgrade pip





In [2]:
!pip install pyspark




[notice] A new release of pip available: 22.3.1 -> 23.3.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
import findspark
from pyspark.sql import SparkSession

In [37]:
spark = SparkSession.builder.appName('StockAnalysis').getOrCreate()

In [39]:
df_single = spark.read.csv('misc/GOOGLE.csv', header=True, inferSchema=True)
df_single.printSchema()
df_single.show()
df_another = spark.read.csv('misc/FACEBOOK.csv', header=True, inferSchema=True)
df_another.printSchema()
df_another.show()

root
 |-- Date: date (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- company_name: string (nullable = true)

+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+
|      Date|             High|              Low|             Open|            Close| Volume|        Adj Close|company_name|
+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+
|2017-01-03|789.6300048828125|775.7999877929688|778.8099975585938|786.1400146484375|1657300|786.1400146484375|      GOOGLE|
|2017-01-04|791.3400268554688|783.1599731445312|788.3599853515625|786.9000244140625|1073000|786.9000244140625|      GOOGLE|
|2017-01-05|  794.47998046875|  785.02001953125|786.0800170898438|  794.

We can see that it seem that the datas possesed the same structure we can then all load them in one time inside a Dataframe. We will need later to check for next files if they share the same structure to merge them

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from os import listdir
from os.path import isfile, join

def check_and_load_files(folder_path):
    spark = SparkSession.builder.appName('StockAnalysis').getOrCreate()

    file_list = [f for f in listdir(folder_path) if isfile(join(folder_path, f))]
    first_schema = None
    main_df = None
    mismatched_files = []

    for file in file_list:
        df = spark.read.csv(join(folder_path, file), header=True, inferSchema=True)
        if first_schema is None:
            first_schema = df.schema
            main_df = df
        else:
            if df.schema != first_schema:
                mismatched_files.append(file)

    if mismatched_files:
        print("The following files have mismatched schemas and were not loaded:")
        for file in mismatched_files:
            print(file)
    else:
        main_df = spark.read.csv(folder_path + '/*.csv', header=True, schema=first_schema)
    return main_df

Here we implement a little function to help us only load data with the same sheme contain into a directory it will help for next steps

## <b><div style='padding:15px;background-color:#003f88;color:white;border-radius:2px;font-size:110%;text-align: center'>1  |  Data exploration</div></b>

In [41]:
###loading temporaire

In [42]:
df_google = spark.read.csv('misc/GOOGLE.csv', header=True, inferSchema=True)
df_facebook = spark.read.csv('misc/FACEBOOK.csv', header=True, inferSchema=True)
df_apple = spark.read.csv('misc/APPLE.csv', header=True, inferSchema=True)
df_amazon = spark.read.csv('misc/AMAZON.csv', header=True, inferSchema=True)
df_tesla = spark.read.csv('misc/TESLA.csv', header=True, inferSchema=True)
df_zoom = spark.read.csv('misc/ZOOM.csv', header=True, inferSchema=True)
df_microsoft = spark.read.csv('misc/MICROSOFT.csv', header=True, inferSchema=True)

In [9]:
df_all = df_google.union(df_facebook)\
                  .union(df_apple)\
                  .union(df_amazon)\
                  .union(df_tesla)\
                  .union(df_zoom)\
                  .union(df_microsoft)

### <b><span style='color:#DEB078'>1.1 |</span><span style='color:#003f88'> load the data</span></b>  

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, StringType
from pyspark.sql.functions import col
from pyspark.sql import functions as F
import datetime
from pyspark.sql.types import*
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, unix_timestamp, lag
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, StringType, TimestampType,IntegerType

def read_and_validate_csv(file_path):
    expected_schema = StructType([
        StructField("Date", DateType(), True),
        StructField("High", DoubleType(), True),
        StructField("Low", DoubleType(), True),
        StructField("Open", DoubleType(), True),
        StructField("Close", DoubleType(), True),
        StructField("Volume", DoubleType(), True),
        StructField("Adj Close", DoubleType(), True),
        StructField("company_name", StringType(), True)
    ])

    df = spark.read.csv(file_path, header=True, inferSchema=True)

    if len(df.columns) == len(expected_schema) and \
       all([f.dataType == df.schema[i].dataType for i, f in enumerate(expected_schema)]):
        for i, field in enumerate(expected_schema):
            df = df.withColumnRenamed(df.columns[i], field.name)
        return df
    else:
        raise ValueError("CSV file schema does not match the expected schema.")

### <b><span style='color:#DEB078'>1.2 |</span><span style='color:#003f88'> merge the datas</span></b>  

In [11]:
df_all.printSchema()
num_observations = df_all.count()
print(f"Number of observations: {num_observations}")

root
 |-- Date: date (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- company_name: string (nullable = true)

Number of observations: 6333


In [12]:
df_all.show(truncate=False,n=40)  # First 40 rows
df_all.orderBy(df_all["Date"].desc()).show(truncate=False,n=40)  # Last 40 rows, replace 'YourDateColumn'

+----------+-----------------+-----------------+-----------------+-----------------+---------+-----------------+------------+
|Date      |High             |Low              |Open             |Close            |Volume   |Adj Close        |company_name|
+----------+-----------------+-----------------+-----------------+-----------------+---------+-----------------+------------+
|2017-01-03|789.6300048828125|775.7999877929688|778.8099975585938|786.1400146484375|1657300.0|786.1400146484375|GOOGLE      |
|2017-01-04|791.3400268554688|783.1599731445312|788.3599853515625|786.9000244140625|1073000.0|786.9000244140625|GOOGLE      |
|2017-01-05|794.47998046875  |785.02001953125  |786.0800170898438|794.02001953125  |1335200.0|794.02001953125  |GOOGLE      |
|2017-01-06|807.9000244140625|792.2039794921875|795.260009765625 |806.1500244140625|1640200.0|806.1500244140625|GOOGLE      |
|2017-01-09|809.9660034179688|802.8300170898438|806.4000244140625|806.6500244140625|1274600.0|806.6500244140625|GOOGLE

In [13]:
df_all.describe().show()

+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+------------+
|summary|              High|               Low|              Open|            Close|             Volume|         Adj Close|company_name|
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+------------+
|  count|              6333|              6333|              6333|             6333|               6333|              6333|        6333|
|   mean| 544.0228364162303| 531.5214318489739|  537.936037946346|538.0659504301191|3.616260520969525E7| 537.4496728170357|        NULL|
| stddev| 704.5601710027677| 688.2614226133601| 696.8556864449436| 696.675025729266|5.015634459633583E7| 697.0769679550222|        NULL|
|    min|29.082500457763672|28.690000534057617|28.950000762939453| 29.0049991607666|           285821.0|27.247108459472656|      AMAZON|
|    max|           3552.25|  3486.689941

### <b><span style='color:#DEB078'>1.3 |</span><span style='color:#003f88'> Data insights explanation</span></b>  

<div style="width:100%;padding:10px;background-color:#f2f2f2;border-radius:5px;">
    
<b>Number of observation:</b>
<p>There is currently 6333 observation in our dataset that regroup the 7 files</p>

<b>Range of Stock Prices:</b>
<p>The min and max values across High, Low, Open, and Close columns show a wide range in stock prices, indicating a diverse set of companies.</p>

<b>Volatility:</b>
<p>High standard deviation values suggest significant volatility in stock prices among these companies.</p>

<b>Average Prices:</b>
<p>The mean values for High, Low, Open, and Close are close to each other, indicating that stock prices don’t fluctuate wildly within a trading day.</p>

<b>Trading Volume:</b>
<p>Substantial variation in trading volumes is observed, with some stocks traded much more heavily than others.</p>

<b>Adjusted Close Prices:</b>
<p>Adjusted Close prices generally align with Close prices but are slightly lower on average, accounting for dividends and stock splits.</p>

<b>Company-Specific Observations:</b>
<p>Stocks from companies like AMAZON and ZOOM show significantly different price ranges and trading volumes.</p>

<b>Data Completeness:</b>
<p>No missing values in key columns, and the company_name mean column null value is normal because of the non-numeric.</p>
</div>


In [14]:
def deduce_adaptive_time_interval(df, date_column_name="Date"):
    if date_column_name not in df.columns:
        raise ValueError(f"Column '{date_column_name}' not found in DataFrame")

    if df.rdd.isEmpty():
        raise ValueError("DataFrame is empty")

    df = df.withColumn(date_column_name, F.col(date_column_name).cast(TimestampType()))
    windowSpec = Window.orderBy(date_column_name)
    df_with_lag = df.withColumn("PrevDate", F.lag(df[date_column_name]).over(windowSpec))
    df_with_diff = df_with_lag.withColumn("TimeDiffSeconds", F.unix_timestamp(df[date_column_name]) - F.unix_timestamp(df_with_lag["PrevDate"]))

    most_common_diff = df_with_diff.filter(df_with_diff["TimeDiffSeconds"] > 0)\
                                   .groupBy("TimeDiffSeconds").count()\
                                   .orderBy("count", ascending=False).first()

    if most_common_diff is None or most_common_diff["TimeDiffSeconds"] is None:
        print("No consistent interval found")
        return None

    seconds = most_common_diff["TimeDiffSeconds"]
    interval_str = ''
    if seconds >= 86400:
        interval_str = f"{seconds // 86400} day(s)"
    elif seconds >= 3600:
        interval_str = f"{seconds // 3600} hour(s)"
    elif seconds >= 60:
        interval_str = f"{seconds // 60} minute(s)"
    else:
        interval_str = f"{seconds} second(s)"

    print(f"The most common time interval between data points is {interval_str}.")
    return interval_str


time_interval = deduce_adaptive_time_interval(df_all)

The most common time interval between data points is 1 day(s).



<div style="width:100%;padding:10px;background-color:#f2f2f2;border-radius:5px;">

<p><b>deduce_adaptive_time_interval:</b> This function analyzes time-series data in <code>df_all</code> to find the most common interval between data points. It's crucial for understanding data frequency in aggregated datasets from varied sources. The function dynamically categorizes intervals into appropriate units (seconds, minutes, hours, days) and returns this interval as a string, aiding in subsequent temporal analyses.</p>

</div>


### <b><span style='color:#DEB078'>1.4 |</span><span style='color:#003f88'> Containerize the code</span></b>  

<div style="width:100%;padding:10px;background-color:#f2f2f2;border-radius:5px;">

<p> With all these infos we can now create a class to get all infos we need about a single file that we load.</p>

</div>

In [15]:
from pyspark.sql.functions import corr
from pyspark.sql.functions import lit
from pyspark.sql.functions import weekofyear, month, year, avg
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lag, datediff, to_date, last_day, months_between
from pyspark.sql.functions import rank, desc, first, last, min, max
from pyspark.sql.window import Window

class StockDataAnalyzer:
    def __init__(self, file_path):
        self.spark = SparkSession.builder.appName('StockAnalysis').getOrCreate()
        self.file_path = file_path
        self.df = None

    def read_and_validate_csv(self):
        expected_schema = StructType([
        StructField("Date", DateType(), True),
        StructField("High", DoubleType(), True),
        StructField("Low", DoubleType(), True),
        StructField("Open", DoubleType(), True),
        StructField("Close", DoubleType(), True),
        StructField("Volume", IntegerType(), True),
        StructField("Adj Close", DoubleType(), True),
        StructField("company_name", StringType(), True)
        ])

        df = spark.read.csv(self.file_path, header=True, inferSchema=True)
        if len(df.columns) == len(expected_schema) and \
           all([f.dataType == df.schema[i].dataType for i, f in enumerate(expected_schema)]):
            for i, field in enumerate(expected_schema):
                df = df.withColumnRenamed(df.columns[i], field.name)
            self.df = df
        else:
            raise ValueError("CSV file schema does not match the expected schema.")

    def get_basic_statistics(self):
        if self.df is not None:
            return self.df.describe()
        else:
            raise ValueError("DataFrame is not loaded.")

    def get_number_of_observations(self):
        if self.df is not None:
            return self.df.count()
        else:
            raise ValueError("DataFrame is not loaded.")

    def get_null_values_count(self):
        if self.df is not None:
            return self.df.select([count(when(col(c).isNull(), c)).alias(c) for c in self.df.columns])
        else:
            raise ValueError("DataFrame is not loaded.")


    def deduce_adaptive_time_interval(self, date_column_name="Date"):
        if self.df is None:
            raise ValueError("DataFrame is not loaded")

        df = self.df.withColumn(date_column_name, col(date_column_name).cast(TimestampType()))
        windowSpec = Window.orderBy(date_column_name)
        df_with_lag = df.withColumn("PrevDate", lag(df[date_column_name]).over(windowSpec))
        df_with_diff = df_with_lag.withColumn("TimeDiffSeconds", unix_timestamp(df[date_column_name]) - unix_timestamp(df_with_lag["PrevDate"]))

        most_common_diff = df_with_diff.filter(df_with_diff["TimeDiffSeconds"] > 0)\
                                      .groupBy("TimeDiffSeconds").count()\
                                      .orderBy("count", ascending=False).first()

        if most_common_diff is None or most_common_diff["TimeDiffSeconds"] is None:
            return "No consistent interval found"

        seconds = most_common_diff["TimeDiffSeconds"]
        if seconds >= 86400:
            return f"{seconds // 86400} day(s)"
        elif seconds >= 3600:
            return f"{seconds // 3600} hour(s)"
        elif seconds >= 60:
            return f"{seconds // 60} minute(s)"
        else:
            return f"{seconds} second(s)"

    def compute_correlation_matrix(self):
        if self.df is None:
            raise ValueError("DataFrame is not loaded.")

        numeric_columns = [f.name for f in self.df.schema.fields if isinstance(f.dataType, (DoubleType, IntegerType))]

        correlation_values = {}

        for col1 in numeric_columns:
            for col2 in numeric_columns:
                if (col2, col1) not in correlation_values:
                    correlation = self.df.stat.corr(col1, col2)
                    correlation_values[(col1, col2)] = correlation

        matrix_data = []
        for col1 in numeric_columns:
            row_data = {col2: correlation_values.get((col1, col2)) or correlation_values.get((col2, col1)) for col2 in numeric_columns}
            row_data['RowLabel'] = col1
            matrix_data.append(Row(**row_data))

        correlation_matrix_df = self.spark.createDataFrame(matrix_data)

        ordered_columns = ['RowLabel'] + numeric_columns
        correlation_matrix_df = correlation_matrix_df.select(*ordered_columns)

        return correlation_matrix_df


    def calculate_return_rate(self, period):
        """
        Calculate the return rate of the stock for different periods (week, month, year).

        Parameters:
        period (str): The period for which to calculate the return rate. Must be 'week', 'month', or 'year'.
        """
        if self.df is None:
            raise ValueError("DataFrame is not loaded.")

        # Déterminez la colonne de période
        if period == 'week':
            period_col = weekofyear(col('Date'))
        elif period == 'month':
            period_col = month(col('Date'))
        elif period == 'year':
            period_col = year(col('Date'))
        else:
            raise ValueError("Period must be 'week', 'month', or 'year'.")

        # Créer une fenêtre pour le calcul
        windowSpec = Window.partitionBy('company_name', period_col).orderBy('Date')
        windowSpecUnbounded = windowSpec.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

        # Calculer le premier et le dernier prix dans chaque période
        self.df = self.df.withColumn('FirstPrice', first(col('Close')).over(windowSpec))
        self.df = self.df.withColumn('LastPrice', last(col('Close')).over(windowSpecUnbounded))

        # Calculer le taux de rendement
        return_rate_df = self.df.withColumn('ReturnRate', (col('LastPrice') - col('FirstPrice')) / col('FirstPrice'))\
                                .select('company_name', period_col.alias(period), 'ReturnRate')\
                                .groupBy('company_name', period).agg(max('ReturnRate').alias('ReturnRate'))

        return return_rate_df.orderBy('company_name', period)



    def analyze(self):
        try:
            self.read_and_validate_csv()
            print(f"Number of Observations: {self.get_number_of_observations()}")
            print("Basic Statistics:")
            self.get_basic_statistics().show()
            print("Null Values Count:")
            self.get_null_values_count().show()
            print(f"Most Common Time Interval: {self.deduce_adaptive_time_interval()}")

            print("Correlations between numerical columns:")
            correlation_matrix_df = self.compute_correlation_matrix()
            correlation_matrix_df.show(truncate=False)



            print("Weekly return rates")
            weekly_return_rates = analyzer.calculate_return_rate('week')
            weekly_return_rates.show()

            print("Monthly return rates")
            monthly_return_rates = analyzer.calculate_return_rate('month')
            monthly_return_rates.show()

            print("Yearly return rates")
            yearly_return_rates = analyzer.calculate_return_rate('year')
            yearly_return_rates.show()

        except ValueError as e:
            print(f"Error during analysis: {e}")

analyzer = StockDataAnalyzer('misc/GOOGLE.csv')
analyzer.analyze()

Number of Observations: 987
Basic Statistics:
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------+
|summary|              High|               Low|              Open|             Close|            Volume|         Adj Close|company_name|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------+
|  count|               987|               987|               987|               987|               987|               987|         987|
|   mean|1175.6790605137292|1152.9178970166856|1163.8854444324065|1164.8144006854736|1634631.7335359675|1164.8144006854736|        NULL|
| stddev| 218.8235632110295| 212.2207173542823|214.97779187990264|215.44599608390612| 716425.6760312195|215.44599608390612|        NULL|
|    min| 789.6300048828125| 775.7999877929688| 778.8099975585938| 786.1400146484375|            285821| 786.1400146484375|      GOO

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> Exploration process involves answering some questions</span></b>  



**What is the average of the opening and closing prices for each stock price and for different time periods (week, month, year) ?**

The code groups the data by time periods and calculates the average. This is useful for understanding volatility during open hours, and potentially for identifying patterns and anomalies.

In [16]:
def calculate_average_prices(df, time_period):
        if df is None:
            raise ValueError("DataFrame is not loaded.")

        if time_period not in ['week', 'month', 'year']:
            raise ValueError("time_period must be one of 'week', 'month', 'year'")

        time_period_func = weekofyear if time_period == 'week' else month if time_period == 'month' else year

        avg_prices_df = df.groupBy('company_name', time_period_func(col('Date')).alias(time_period))\
                               .agg(avg('Open').alias('Avg_Open'),
                                    avg('Close').alias('Avg_Close'))\
                               .orderBy('company_name', time_period)

        return avg_prices_df

print("Average opening and closing prices per week:")
calculate_average_prices(df_all,'week').show()

print("Average opening and closing prices per month:")
calculate_average_prices(df_all,'month').show()

print("Average opening and closing prices per year:")
calculate_average_prices(df_all,'year').show()

Average opening and closing prices per week:
+------------+----+------------------+------------------+
|company_name|week|          Avg_Open|         Avg_Close|
+------------+----+------------------+------------------+
|      AMAZON|   1|1332.7837524414062|1343.9818687438965|
|      AMAZON|   2|1397.6705047607422| 1404.330502319336|
|      AMAZON|   3|1457.4005567762588|1452.9288736979167|
|      AMAZON|   4|1393.6305474175347| 1394.582787407769|
|      AMAZON|   5| 1448.943505859375|1446.4424987792968|
|      AMAZON|   6|1471.8034973144531| 1469.330499267578|
|      AMAZON|   7|1503.5844940185548|1509.8065124511718|
|      AMAZON|   8|1525.7893753051758|1526.3581161499023|
|      AMAZON|   9|1485.8975036621093|1486.8674896240234|
|      AMAZON|  10| 1495.294989013672| 1497.688998413086|
|      AMAZON|  11| 1480.216989135742|1480.2699981689452|
|      AMAZON|  12|1492.1164978027343| 1496.715509033203|
|      AMAZON|  13| 1509.501050447163|1512.3126284950658|
|      AMAZON|  14|1513.170

**How do the stock prices change day to day and month to month ?**

The function calculates the day-to-day or month-to-month change in stock price by subtracting the closing price of the previous day or month from the closing price of the current day or month. While the previous function was useful for understanding volatility during open hours, this one is helpful for comprehending volatility over the course of a full day or month.

In [17]:
def calculate_day_to_day_changes(df):
        if df is None:
            raise ValueError("DataFrame is not loaded.")

        windowSpec = Window.partitionBy('company_name').orderBy('Date')

        day_change_df = df.withColumn('PrevClose', lag('Close').over(windowSpec))\
                               .withColumn('DayChange', col('Close') - col('PrevClose'))\
                               .select('Date', 'company_name', 'DayChange')

        return day_change_df

def calculate_month_to_month_changes(df):
    if df is None:
        raise ValueError("DataFrame is not loaded.")

    df = df.withColumn('LastDayOfMonth', last_day(col('Date')))

    windowSpec = Window.partitionBy('company_name', year(col('Date')), month(col('Date'))).orderBy(desc('Date'))

    ranked_df = df.withColumn('Rank', rank().over(windowSpec))

    last_day_of_month_df = ranked_df.filter(col('Rank') == 1)

    lag_windowSpec = Window.partitionBy('company_name').orderBy('LastDayOfMonth')

    month_change_df = last_day_of_month_df.withColumn('PrevMonthClose', lag('Close').over(lag_windowSpec))\
                                          .withColumn('MonthChange', col('Close') - col('PrevMonthClose'))\
                                          .select('Date', 'company_name', 'MonthChange')\
                                          .filter(col('MonthChange').isNotNull())\
                                          .orderBy('company_name', 'Date')

    return month_change_df

print("Day-to-day stock price changes:")
day_to_day_changes_df = calculate_day_to_day_changes(df_all)
day_to_day_changes_df.show()

print("Month-to-month stock price changes:")
month_to_month_changes_df = calculate_month_to_month_changes(df_all)
month_to_month_changes_df.show()

Day-to-day stock price changes:
+----------+------------+-----------------+
|      Date|company_name|        DayChange|
+----------+------------+-----------------+
|2017-01-03|      AMAZON|             NULL|
|2017-01-04|      AMAZON|   3.510009765625|
|2017-01-05|      AMAZON|   23.27001953125|
|2017-01-06|      AMAZON|15.53997802734375|
|2017-01-09|      AMAZON| 0.92999267578125|
|2017-01-10|      AMAZON|-1.01995849609375|
|2017-01-11|      AMAZON|  3.1199951171875|
|2017-01-12|      AMAZON| 14.6199951171875|
|2017-01-13|      AMAZON|              3.5|
|2017-01-17|      AMAZON| -7.4200439453125|
|2017-01-18|      AMAZON|  -2.239990234375|
|2017-01-19|      AMAZON| 1.55999755859375|
|2017-01-20|      AMAZON|    -0.7099609375|
|2017-01-23|      AMAZON| 9.54998779296875|
|2017-01-24|      AMAZON| 4.55999755859375|
|2017-01-25|      AMAZON|14.08001708984375|
|2017-01-26|      AMAZON|  2.6300048828125|
|2017-01-27|      AMAZON| -3.3800048828125|
|2017-01-30|      AMAZON| -5.3900146484375|


**Based on the opening and closing price, calculate the daily return of each stock ?**

The function calculates the daily return for each stock using the formula ( Close − Open ) / Open. This formula determines the percentage change in the stock price from the opening to the closing of the trading day.

It is a good measure of daily performance

In [18]:
def calculate_daily_returns(df):
        if df is None:
            raise ValueError("DataFrame is not loaded.")

        df = df.withColumn('DailyReturn', (col('Close') - col('Open')) / col('Open'))

        return df.select('Date', 'company_name', 'Open', 'Close', 'DailyReturn')
print("Daily stock returns:")
daily_returns_df = calculate_daily_returns(df_all)
daily_returns_df.show()

Daily stock returns:
+----------+------------+-----------------+-----------------+--------------------+
|      Date|company_name|             Open|            Close|         DailyReturn|
+----------+------------+-----------------+-----------------+--------------------+
|2017-01-03|      GOOGLE|778.8099975585938|786.1400146484375|0.009411816890925667|
|2017-01-04|      GOOGLE|788.3599853515625|786.9000244140625|-0.00185189629690...|
|2017-01-05|      GOOGLE|786.0800170898438|  794.02001953125|0.010100755990212075|
|2017-01-06|      GOOGLE| 795.260009765625|806.1500244140625|0.013693653037636018|
|2017-01-09|      GOOGLE|806.4000244140625|806.6500244140625|3.100198318838746E-4|
|2017-01-10|      GOOGLE|807.8599853515625|804.7899780273438|-0.00380017252975...|
|2017-01-11|      GOOGLE|            805.0|807.9099731445312|0.003614873471467...|
|2017-01-12|      GOOGLE|807.1400146484375|806.3599853515625|-9.66411381815525...|
|2017-01-13|      GOOGLE|  807.47998046875|807.8800048828125|4.953

**Calculate the average daily return for different periods (week, month, and year)?**

By using calculate_daily_returns, we calculate its average over various periods.

In [19]:
def calculate_daily_returns_df(df):
        if df is None:
            raise ValueError("DataFrame is not loaded.")

        df = df.withColumn('DailyReturn', (col('Close') - col('Open')) / col('Open'))
        return df

def calculate_average_daily_returns(df, period):
    if df is None:
        raise ValueError("DataFrame is not loaded.")

    if 'DailyReturn' not in df.columns:
        df = calculate_daily_returns_df(df)

    if period == 'week':
        period_column = weekofyear(col('Date'))
    elif period == 'month':
        period_column = month(col('Date'))
    elif period == 'year':
        period_column = year(col('Date'))
    else:
        raise ValueError("Period must be 'week', 'month', or 'year'.")

    avg_daily_returns_df = df.groupBy('company_name', period_column.alias(period))\
                                  .agg(avg('DailyReturn').alias('AvgDailyReturn'))\
                                  .orderBy('company_name', period_column)

    return avg_daily_returns_df

print("Average daily return per week:")
avg_daily_returns_week_df = calculate_average_daily_returns(df_all, 'week')
avg_daily_returns_week_df.show()

print("Average daily return per month:")
avg_daily_returns_month_df = calculate_average_daily_returns(df_all, 'month')
avg_daily_returns_month_df.show()

print("Average daily return per year:")
avg_daily_returns_year_df = calculate_average_daily_returns(df_all, 'year')
avg_daily_returns_year_df.show()

Average daily return per week:
+------------+----+--------------------+
|company_name|week|      AvgDailyReturn|
+------------+----+--------------------+
|      AMAZON|   1|0.009011415507826408|
|      AMAZON|   2|0.005298905854681818|
|      AMAZON|   3|-0.00346769040398...|
|      AMAZON|   4|0.001936857279947...|
|      AMAZON|   5|-0.00132025186267...|
|      AMAZON|   6|-0.00154018140736...|
|      AMAZON|   7|0.004635445350751888|
|      AMAZON|   8|8.221081842109969E-4|
|      AMAZON|   9|  8.5997797147561E-4|
|      AMAZON|  10|0.001685723299730...|
|      AMAZON|  11|-8.14364621083692...|
|      AMAZON|  12|0.002489068388508...|
|      AMAZON|  13| 0.00290151812871754|
|      AMAZON|  14|5.341417667250773E-4|
|      AMAZON|  15|2.007933707042426...|
|      AMAZON|  16|0.009485425702611001|
|      AMAZON|  17|-0.00595182535205...|
|      AMAZON|  18|-0.00170620301373...|
|      AMAZON|  19|0.002397471561113...|
|      AMAZON|  20|0.001782832466543...|
+------------+----+-------

**What are the stocks with the highest daily return?**

Every day, we will identify the company with the highest return for that day. Since we are using df_all, it's clear that the company with the highest daily return isn't the same every day.




In [20]:
from pyspark.sql.functions import max as spark_max
def max_daily_return(df):

  df = df.withColumn('DailyReturn', (col('Close') - col('Open')) / col('Open'))

  windowSpec = Window.partitionBy('Date')

  highest_daily_returns = df.withColumn('MaxDailyReturn', spark_max('DailyReturn').over(windowSpec)) \
                            .where(col('DailyReturn') == col('MaxDailyReturn')) \
                            .select('Date', 'company_name', 'DailyReturn')

  return highest_daily_returns
max_daily_return(df_all).show()

+----------+------------+--------------------+
|      Date|company_name|         DailyReturn|
+----------+------------+--------------------+
|2017-01-03|       TESLA|0.009913401436610085|
|2017-01-04|       TESLA| 0.05699646108009665|
|2017-01-05|      AMAZON|0.024817838247015465|
|2017-01-06|    FACEBOOK| 0.02008596658744028|
|2017-01-09|    FACEBOOK|0.010926737683329314|
|2017-01-10|       APPLE|0.002862709243875...|
|2017-01-11|    FACEBOOK|0.013992745356821603|
|2017-01-12|      AMAZON| 0.01665606718709997|
|2017-01-13|       TESLA| 0.03369563558827276|
|2017-01-17|       APPLE|0.014027410118973172|
|2017-01-18|       TESLA|0.007225840708306964|
|2017-01-19|       APPLE|0.003182556520617...|
|2017-01-20|   MICROSOFT|0.001117017896390701|
|2017-01-23|      GOOGLE|0.014939606761961908|
|2017-01-24|       TESLA| 0.01844001770019531|
|2017-01-25|      AMAZON|0.012993668837612066|
|2017-01-26|    FACEBOOK|0.008736563502433894|
|2017-01-27|       TESLA|0.006245508193298224|
|2017-01-30| 

In the situation where we need to explore to answers some questions about the data, the best way is to create a class names StockDataQuestions.

o What is the average of the opening and closing prices for each stock price and for different time periods (week, month, year)

o How do the stock prices change day to day and month to month

o Based on the opening and closing price, calculate the daily return of each stock

o What are the stocks with the highest daily return.

o Calculate the average daily return for different periods (week, month, and year)



In [21]:
class StockDataQuestions(StockDataAnalyzer):
    def __init__(self, file_path):
        super().__init__(file_path)

    def calculate_average_prices(self, time_period):
        if self.df is None:
            raise ValueError("DataFrame is not loaded.")

        if time_period not in ['week', 'month', 'year']:
            raise ValueError("time_period must be one of 'week', 'month', 'year'")

        time_period_func = weekofyear if time_period == 'week' else month if time_period == 'month' else year

        avg_prices_df = self.df.groupBy('company_name', time_period_func(col('Date')).alias(time_period))\
                               .agg(avg('Open').alias('Avg_Open'),
                                    avg('Close').alias('Avg_Close'))\
                               .orderBy('company_name', time_period)

        return avg_prices_df

    def calculate_day_to_day_changes(self):
        if self.df is None:
            raise ValueError("DataFrame is not loaded.")

        windowSpec = Window.partitionBy('company_name').orderBy('Date')

        day_change_df = self.df.withColumn('PrevClose', lag('Close').over(windowSpec))\
                               .withColumn('DayChange', col('Close') - col('PrevClose'))\
                               .select('Date', 'company_name', 'DayChange')

        return day_change_df

    def calculate_month_to_month_changes(self):
        if self.df is None:
            raise ValueError("DataFrame is not loaded.")

        self.df = self.df.withColumn('LastDayOfMonth', last_day(col('Date')))

        windowSpec = Window.partitionBy('company_name', year(col('Date')), month(col('Date'))).orderBy(desc('Date'))

        ranked_df = self.df.withColumn('Rank', rank().over(windowSpec))

        last_day_of_month_df = ranked_df.filter(col('Rank') == 1)

        lag_windowSpec = Window.partitionBy('company_name').orderBy('LastDayOfMonth')

        month_change_df = last_day_of_month_df.withColumn('PrevMonthClose', lag('Close').over(lag_windowSpec))\
                                              .withColumn('MonthChange', col('Close') - col('PrevMonthClose'))\
                                              .select('Date', 'company_name', 'MonthChange')\
                                              .filter(col('MonthChange').isNotNull())\
                                              .orderBy('company_name', 'Date')

        return month_change_df

    def calculate_daily_returns(self):
        if self.df is None:
            raise ValueError("DataFrame is not loaded.")

        self.df = self.df.withColumn('DailyReturn', (col('Close') - col('Open')) / col('Open'))

        return self.df.select('Date', 'company_name', 'Open', 'Close', 'DailyReturn')

    def max_daily_return(self):
        self.df = self.df.withColumn('DailyReturn', (col('Close') - col('Open')) / col('Open'))

        windowSpec = Window.partitionBy('Date')

        highest_daily_returns = self.df.withColumn('MaxDailyReturn', spark_max('DailyReturn').over(windowSpec)) \
                                  .where(col('DailyReturn') == col('MaxDailyReturn')) \
                                  .select('Date', 'company_name', 'DailyReturn')

        return highest_daily_returns

    def calculate_daily_returns_df(self):
        if self.df is None:
            raise ValueError("DataFrame is not loaded.")

        self.df = self.df.withColumn('DailyReturn', (col('Close') - col('Open')) / col('Open'))
        return self.df

    def calculate_average_daily_returns(self, period):
        if self.df is None:
            raise ValueError("DataFrame is not loaded.")

        if 'DailyReturn' not in self.df.columns:
            self.calculate_daily_returns_df()

        if period == 'week':
            period_column = weekofyear(col('Date'))
        elif period == 'month':
            period_column = month(col('Date'))
        elif period == 'year':
            period_column = year(col('Date'))
        else:
            raise ValueError("Period must be 'week', 'month', or 'year'.")

        avg_daily_returns_df = self.df.groupBy('company_name', period_column.alias(period))\
                                      .agg(avg('DailyReturn').alias('AvgDailyReturn'))\
                                      .orderBy('company_name', period_column)

        return avg_daily_returns_df


    def analyze(self):
        try:
            self.read_and_validate_csv()

            print("Average opening and closing prices per week:")
            self.calculate_average_prices('week').show()

            print("Average opening and closing prices per month:")
            self.calculate_average_prices('month').show()

            print("Average opening and closing prices per year:")
            self.calculate_average_prices('year').show()

            print("Day-to-day stock price changes:")
            day_to_day_changes_df = self.calculate_day_to_day_changes()
            day_to_day_changes_df.show()

            print("Month-to-month stock price changes:")
            month_to_month_changes_df = self.calculate_month_to_month_changes()
            month_to_month_changes_df.show()

            print("Daily stock returns:")
            daily_returns_df = self.calculate_daily_returns()
            daily_returns_df.show()

            print("Stocks with the highest daily return every day :")
            print(self.max_daily_return().show())

            print("Average daily return per week:")
            avg_daily_returns_week_df = self.calculate_average_daily_returns('week')
            avg_daily_returns_week_df.show()

            print("Average daily return per month:")
            avg_daily_returns_month_df = self.calculate_average_daily_returns('month')
            avg_daily_returns_month_df.show()

            print("Average daily return per year:")
            avg_daily_returns_year_df = self.calculate_average_daily_returns('year')
            avg_daily_returns_year_df.show()


        except ValueError as e:
            print(f"Error during analysis: {e}")

analyzer = StockDataQuestions('misc/GOOGLE.csv')
analyzer.analyze()

Average opening and closing prices per week:
+------------+----+------------------+------------------+
|company_name|week|          Avg_Open|         Avg_Close|
+------------+----+------------------+------------------+
|      GOOGLE|   1|1059.6124992370605|1067.4212532043457|
|      GOOGLE|   2|1094.9785034179688|1098.2729858398438|
|      GOOGLE|   3|1128.8416646321614|1131.7380506727432|
|      GOOGLE|   4| 1124.008324517144|1122.5127733018662|
|      GOOGLE|   5|1124.2879974365235| 1123.283743286133|
|      GOOGLE|   6| 1108.561505126953|1109.4344940185547|
|      GOOGLE|   7|1125.6254913330079|1130.5980072021484|
|      GOOGLE|   8|1140.8043594360352|1141.1831283569336|
|      GOOGLE|   9|1107.8075012207032|1106.2390075683593|
|      GOOGLE|  10|1108.0759979248046|1112.3220153808593|
|      GOOGLE|  11|1097.5137481689453|1097.7510009765624|
|      GOOGLE|  12| 1055.061505126953|1051.6089996337892|
|      GOOGLE|  13|1036.3100071957238|1036.1068372224506|
|      GOOGLE|  14|1043.368

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> Moving average</span></b>  

The moving average is calculated by adding a stock's prices over a certain
period and dividing the sum by the total number of periods.

In trading, moving averages are used to generate signals for buying or selling stocks. For instance, when a stock price crosses its moving average upwards, it could be seen as a buy signal

In [22]:
def add_moving_average(df, column_name, num_points):
        """
        Add a column with the moving average of the specified column.

        Parameters:
        column_name (str): The name of the column to calculate the moving average for.
        num_points (int): The number of points to consider for the moving average.
        """
        if df is None:
            raise ValueError("DataFrame is not loaded.")

        windowSpec = Window.partitionBy('company_name').orderBy('Date').rowsBetween(-num_points + 1, 0)

        moving_avg_col_name = f"{column_name}_MA{num_points}"
        df = df.withColumn(moving_avg_col_name, F.avg(col(column_name)).over(windowSpec))

        return df

print("Adding moving average for the 'Open' column over 5 periods:")
add_moving_average(df_facebook,'Open', 5).show()

Adding moving average for the 'Open' column over 5 periods:
+----------+------------------+------------------+------------------+------------------+--------+------------------+------------+------------------+
|      Date|              High|               Low|              Open|             Close|  Volume|         Adj Close|company_name|          Open_MA5|
+----------+------------------+------------------+------------------+------------------+--------+------------------+------------+------------------+
|2017-01-03|117.83999633789062|115.51000213623047|116.02999877929688|116.86000061035156|20663900|116.86000061035156|    FACEBOOK|116.02999877929688|
|2017-01-04|119.66000366210938|117.29000091552734|117.55000305175781|118.69000244140625|19630900|118.69000244140625|    FACEBOOK|116.79000091552734|
|2017-01-05|120.94999694824219|118.31999969482422|118.86000061035156|120.66999816894531|19492200|120.66999816894531|    FACEBOOK|117.48000081380208|
|2017-01-06|123.87999725341797|120.02999877929

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> Correlation stocks</span></b>  


We need to create a correlation function between 2 stocks.  'correlation_stocks' calculates the correlation matrix between two specified stocks based on their numerical metrics and then we use a loop to calculate and display the correlation matrices for each pair of stocks from a specified list of stocks.

This approach systematically computes and displays the correlation matrices for each combination of stock pairs from the given list.
The output would be a series of correlation matrices, each showing how various numeric metrics of one stock correlate with those of another, for all possible pairs in the provided list.

This can be particularly useful for financial analysis, where understanding the relationship between different stocks is crucial for portfolio management and risk assessment.

In [23]:
def correlation_stocks(df, stock1, stock2):
    """
    Calculate the correlation between two stocks based on a specified metric.

    Parameters:
    df (Dataset): Dataset with all stocks
    stock1 (str): Identifier for the first stock (e.g., company name or stock symbol).
    stock2 (str): Identifier for the second stock.
    """

    if df is None:
        raise ValueError("DataFrame is not loaded.")

    if not (df.filter(col('company_name').contains(stock1)) and df.filter(col('company_name').contains(stock2))):
        raise ValueError("One or both of the stock names do not exist in the DataFrame.")

    # Filter data for the two stocks
    df_stock1 = df.filter(df['company_name'] == stock1).drop('company_name')
    df_stock2 = df.filter(df['company_name'] == stock2).drop('company_name')

    for column in df_stock1.columns:
        if column != 'Date':
            df_stock1 = df_stock1.withColumnRenamed(column, column + f'_{stock1}')
    for column in df_stock2.columns:
        if column != 'Date':
            df_stock2 = df_stock2.withColumnRenamed(column, column + f'_{stock2}')

    # Join the data on date
    df_combined = df_stock1.join(df_stock2, on='Date', how='inner')

    # Sélection des colonnes numériques
    numeric_cols_1 = [c for c in df_stock1.columns if isinstance(df_stock1.schema[c].dataType, (DoubleType, IntegerType)) and c != 'Date']
    numeric_cols_2 = [c for c in df_stock2.columns if isinstance(df_stock2.schema[c].dataType, (DoubleType, IntegerType)) and c != 'Date']

    # Calculer la corrélation
    correlation_matrix = []
    for col1 in numeric_cols_1:
        row = {'Metric': col1}
        for col2 in numeric_cols_2:
            row[col2] = df_combined.stat.corr(col1, col2)
        correlation_matrix.append(row)

    # Créer la DataFrame pour la matrice de corrélation
    correlation_matrix_df = spark.createDataFrame(correlation_matrix)
    ordered_columns = ['Metric'] + numeric_cols_2
    correlation_matrix_df = correlation_matrix_df.select(*ordered_columns)

    return correlation_matrix_df

stocks = ['AMAZON', 'APPLE', 'FACEBOOK', 'GOOGLE', 'MICROSOFT', 'TESLA', 'ZOOM']

# Boucle pour calculer la corrélation pour chaque paire de stocks
for i in range(len(stocks)):
    for j in range(i + 1, len(stocks)):
        stock1 = stocks[i]
        stock2 = stocks[j]

        # Calculer la matrice de corrélation
        correlation_matrix_df = correlation_stocks(df_all, stock1, stock2)

        # Afficher les résultats
        print(f"Correlation between {stock1} and {stock2}:")
        correlation_matrix_df.show(truncate=False)


Correlation between AMAZON and APPLE:
+----------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+
|Metric          |High_APPLE         |Low_APPLE           |Open_APPLE         |Close_APPLE         |Volume_APPLE       |Adj Close_APPLE    |
+----------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+
|High_AMAZON     |0.9297874259390847 |0.9288404935316446  |0.929224429531747  |0.9289226688309318  |0.1614264488200611 |0.931117345769924  |
|Low_AMAZON      |0.9294814062048777 |0.9294547100635051  |0.9292104679328616 |0.929201231245745   |0.14280092623787832|0.9313750153531389 |
|Open_AMAZON     |0.9295864367635821 |0.9288996867205574  |0.929432373427345  |0.9286513439934997  |0.15477270907314372|0.9308299253600888 |
|Close_AMAZON    |0.9293612105453593 |0.928970764552425   |0.9287589351197002 |0.9292860084204524  |0.15015419902613

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> Best stock return rate</span></b>  

We also want to know what is the beset return rate between all the stocks given a specific month or year.

The best_stock_return_rate function in the provided script calculates and identifies the stock with the highest return rate for a specific month or year within a given DataFrame (df).

The function is called with df_all (the DataFrame containing stock data), a start date ('2017-01-01'), and the period ('month').
It should output the company with the highest return rate for January 2017.

In [24]:
def best_stock_return_rate(df, start_date_str, period):
    """
    Find the stock with the best return rate for a given month or year.

    Parameters:
    df (DataFrame): The DataFrame containing the stock data.
    start_date_str (str): The start date in 'YYYY-MM-DD' format.
    period (str): The period for which to calculate the return rate. Must be 'month' or 'year'.
    """
    if df is None:
        raise ValueError("DataFrame is not loaded.")

    start_date = lit(start_date_str).cast(DateType())

    # Filter DataFrame based on the start date and period
    if period == 'month':
        filtered_df = df.filter((year(col('Date')) == year(start_date)) & (month(col('Date')) == month(start_date)))
    elif period == 'year':
        filtered_df = df.filter(year(col('Date')) == year(start_date))
    else:
        raise ValueError("Period must be 'month' or 'year'.")

    if filtered_df.count() == 0:
        raise ValueError("No data available for the specified period.")

    # Define window specification
    windowSpec = Window.partitionBy('company_name').orderBy('Date')
    windowSpecUnbounded = windowSpec.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    # Calculate return rate
    return_rate_df = filtered_df.withColumn('FirstPrice', first(col('Close')).over(windowSpec))\
                               .withColumn('LastPrice', last(col('Close')).over(windowSpecUnbounded))\
                               .withColumn('ReturnRate', (col('LastPrice') - col('FirstPrice')) / col('FirstPrice'))\
                               .select('company_name', 'ReturnRate')

    # Find the best return rate
    best_return_rate_df = return_rate_df.groupBy('company_name').agg(max('ReturnRate').alias('MaxReturnRate'))\
                                        .orderBy(col('MaxReturnRate').desc())

    return best_return_rate_df.limit(1)

best_stock = best_stock_return_rate(df_all, '2017-01-01', 'month')
best_stock.show()

+------------+------------------+
|company_name|     MaxReturnRate|
+------------+------------------+
|       TESLA|0.1610213136260853|
+------------+------------------+



In the situation where we need to analyse between all the stocks, the best way is to create a class names StocksAnalyzer that regroup all the stocks and we get the correlations and the best return rate among stocks for a period.

In [25]:
from pyspark.sql.functions import corr
from pyspark.sql.functions import lit
from pyspark.sql.functions import weekofyear, month, year, avg
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lag, datediff, to_date, last_day, months_between
from pyspark.sql.functions import rank, desc, first, last, min, max
from pyspark.sql.window import Window

class StocksAnalyzer:
    def __init__(self, files_path):
        self.spark = SparkSession.builder.appName('StockAnalysis').getOrCreate()
        self.files_path = files_path
        self.df = None

    def read_and_validate_csv(self):
        expected_schema = StructType([
            StructField("Date", DateType(), True),
            StructField("High", DoubleType(), True),
            StructField("Low", DoubleType(), True),
            StructField("Open", DoubleType(), True),
            StructField("Close", DoubleType(), True),
            StructField("Volume", DoubleType(), True),
            StructField("Adj Close", DoubleType(), True),
            StructField("company_name", StringType(), True)
        ])

        dataframes = []
        for file_path in self.files_path:
            df = self.spark.read.csv(f"misc/{file_path}", header=True, inferSchema=True)

            # Convert 'Volume' to DoubleType regardless of its original type
            df = df.withColumn("Volume", col("Volume").cast(DoubleType()))

            if len(df.columns) != len(expected_schema) or \
               not all([f.dataType == df.schema[i].dataType for i, f in enumerate(expected_schema)]):
                raise ValueError(f"CSV file schema in {file_path} does not match the expected schema.")

            # Rename columns to match expected schema
            for i, field in enumerate(expected_schema):
                df = df.withColumnRenamed(df.columns[i], field.name)

            dataframes.append(df)

        # Merge all dataframes into one
        self.df = dataframes[0]
        for dataframe in dataframes[1:]:
            self.df = self.df.unionByName(dataframe)

    def correlation_stocks(self, df, stock1, stock2):
        """
        Calculate the correlation between two stocks based on a specified metric.

        Parameters:
        df (Dataset): Dataset with all stocks
        stock1 (str): Identifier for the first stock (e.g., company name or stock symbol).
        stock2 (str): Identifier for the second stock.
        """

        if df is None:
            raise ValueError("DataFrame is not loaded.")

        if not (df.filter(col('company_name').contains(stock1)) and df.filter(col('company_name').contains(stock2))):
            raise ValueError("One or both of the stock names do not exist in the DataFrame.")

        # Filter data for the two stocks
        df_stock1 = df.filter(df['company_name'] == stock1).drop('company_name')
        df_stock2 = df.filter(df['company_name'] == stock2).drop('company_name')

        for column in df_stock1.columns:
            if column != 'Date':
                df_stock1 = df_stock1.withColumnRenamed(column, column + f'_{stock1}')
        for column in df_stock2.columns:
            if column != 'Date':
                df_stock2 = df_stock2.withColumnRenamed(column, column + f'_{stock2}')

        # Join the data on date
        df_combined = df_stock1.join(df_stock2, on='Date', how='inner')

        # Sélection des colonnes numériques
        numeric_cols_1 = [c for c in df_stock1.columns if isinstance(df_stock1.schema[c].dataType, (DoubleType, IntegerType)) and c != 'Date']
        numeric_cols_2 = [c for c in df_stock2.columns if isinstance(df_stock2.schema[c].dataType, (DoubleType, IntegerType)) and c != 'Date']

        # Calculer la corrélation
        correlation_matrix = []
        for col1 in numeric_cols_1:
            row = {'Metric': col1}
            for col2 in numeric_cols_2:
                row[col2] = df_combined.stat.corr(col1, col2)
            correlation_matrix.append(row)

        # Créer la DataFrame pour la matrice de corrélation
        correlation_matrix_df = spark.createDataFrame(correlation_matrix)
        ordered_columns = ['Metric'] + numeric_cols_2
        correlation_matrix_df = correlation_matrix_df.select(*ordered_columns)

        return correlation_matrix_df


    def best_stock_return_rate(self, df, start_date_str, period):
        """
        Find the stock with the best return rate for a given month or year.

        Parameters:
        df (DataFrame): The DataFrame containing the stock data.
        start_date_str (str): The start date in 'YYYY-MM-DD' format.
        period (str): The period for which to calculate the return rate. Must be 'month' or 'year'.
        """
        if df is None:
            raise ValueError("DataFrame is not loaded.")

        start_date = lit(start_date_str).cast(DateType())

        # Filter DataFrame based on the start date and period
        if period == 'month':
            filtered_df = df.filter((year(col('Date')) == year(start_date)) & (month(col('Date')) == month(start_date)))
        elif period == 'year':
            filtered_df = df.filter(year(col('Date')) == year(start_date))
        else:
            raise ValueError("Period must be 'month' or 'year'.")

        if filtered_df.count() == 0:
            raise ValueError("No data available for the specified period.")

        # Define window specification
        windowSpec = Window.partitionBy('company_name').orderBy('Date')
        windowSpecUnbounded = windowSpec.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

        # Calculate return rate
        return_rate_df = filtered_df.withColumn('FirstPrice', first(col('Close')).over(windowSpec))\
                                  .withColumn('LastPrice', last(col('Close')).over(windowSpecUnbounded))\
                                  .withColumn('ReturnRate', (col('LastPrice') - col('FirstPrice')) / col('FirstPrice'))\
                                  .select('company_name', 'ReturnRate')

        # Find the best return rate
        best_return_rate_df = return_rate_df.groupBy('company_name').agg(max('ReturnRate').alias('MaxReturnRate'))\
                                            .orderBy(col('MaxReturnRate').desc())

        return best_return_rate_df.limit(1)

    def analyze(self):
        try:
            self.read_and_validate_csv()
            company_names_df = self.df.select("company_name").distinct()
            company_names_list = [row['company_name'] for row in company_names_df.collect()]
            for i in range(len(company_names_list)):
              for j in range(i + 1, len(company_names_list)):
                  stock1 = stocks[i]
                  stock2 = stocks[j]

                  # Calculer la matrice de corrélation
                  correlation_matrix_df = self.correlation_stocks(self.df, stock1, stock2)

                  # Afficher les résultats
                  print(f"Correlation between {stock1} and {stock2}:")
                  correlation_matrix_df.show(truncate=False)


        except ValueError as e:
            print(f"Error during analysis: {e}")


files_path = ['AMAZON.csv', 'APPLE.csv', 'FACEBOOK.csv', 'GOOGLE.csv', 'MICROSOFT.csv', 'TESLA.csv', 'ZOOM.csv']
analyzer = StocksAnalyzer(files_path)
analyzer.analyze()
best_stock = best_stock_return_rate(df_all, '2017-01-01', 'month')
best_stock.show()

Correlation between AMAZON and APPLE:
+----------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+
|Metric          |High_APPLE         |Low_APPLE           |Open_APPLE         |Close_APPLE         |Volume_APPLE       |Adj Close_APPLE    |
+----------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+
|High_AMAZON     |0.9297874259390847 |0.9288404935316446  |0.929224429531747  |0.9289226688309318  |0.1614264488200611 |0.931117345769924  |
|Low_AMAZON      |0.9294814062048777 |0.9294547100635051  |0.9292104679328616 |0.929201231245745   |0.14280092623787832|0.9313750153531389 |
|Open_AMAZON     |0.9295864367635821 |0.9288996867205574  |0.929432373427345  |0.9286513439934997  |0.15477270907314372|0.9308299253600888 |
|Close_AMAZON    |0.9293612105453593 |0.928970764552425   |0.9287589351197002 |0.9292860084204524  |0.15015419902613

## <b><div style='padding:15px;background-color:#003f88;color:white;border-radius:2px;font-size:110%;text-align: center'>3  |  useful Insights</div></b>

In [26]:
from pyspark.sql.functions import col, lag, when, avg, stddev, greatest, abs, exp
def window_spec(period):
    return Window.orderBy("Date").rowsBetween(-period + 1, 0)


### <b><span style='color:#DEB078'></span><span style='color:#003f88'> L'Average True Range (ATR)</span></b>  

L'ATR est un indicateur de volatilité qui montre à quel point le prix d'un actif varie en moyenne sur une période donnée. Il est couramment utilisé pour déterminer les seuils de stop-loss et les niveaux de prise de profit.

In [27]:
def average_true_range_pyspark(df, period=14):
    df = df.withColumn("High-Low", col("High") - col("Low"))
    df = df.withColumn("High-PrevClose", abs(col("High") - lag("Close", 1).over(Window.orderBy("Date"))))
    df = df.withColumn("Low-PrevClose", abs(col("Low") - lag("Close", 1).over(Window.orderBy("Date"))))
    df = df.withColumn("TR", greatest("High-Low", "High-PrevClose", "Low-PrevClose"))
    atr = df.withColumn("ATR", avg("TR").over(window_spec(period)))
    return atr
    
average_true_range_pyspark(df_single).show()

+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+-----------------+----------------+-----------------+-----------------+------------------+
|      Date|             High|              Low|             Open|            Close| Volume|        Adj Close|company_name|         High-Low|  High-PrevClose|    Low-PrevClose|               TR|               ATR|
+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+-----------------+----------------+-----------------+-----------------+------------------+
|2017-01-03|789.6300048828125|775.7999877929688|778.8099975585938|786.1400146484375|1657300|786.1400146484375|      GOOGLE|13.83001708984375|            NULL|             NULL|13.83001708984375| 13.83001708984375|
|2017-01-04|791.3400268554688|783.1599731445312|788.3599853515625|786.9000244140625|1073000|786.9000244140625|      GOOGLE|  8.1800537109375|5.2

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> MACD (Moving Average Convergence Divergence)</span></b>  

Le MACD est un indicateur de tendance qui suit la relation entre deux moyennes mobiles des prix. Il est utilisé pour identifier les directions des tendances, les retournements et pour générer des signaux de trading.

In [28]:
def calculate_ema(column, window_period):
    ema = sum([(1 - 2 / (1 + window_period)) ** i * lag(column, i).over(Window.orderBy("Date"))
               for i in range(window_period)])
    return ema

def moving_average_convergence_divergence_pyspark(df, short_period=2, long_period=6):
    df = df.withColumn("Short_EMA", calculate_ema(col("Close"), short_period))
    df = df.withColumn("Long_EMA", calculate_ema(col("Close"), long_period))
    macd = df.withColumn("MACD", col("Short_EMA") - col("Long_EMA"))
    return macd


moving_average_convergence_divergence_pyspark(df_google).show()
moving_average_convergence_divergence_pyspark(df_microsoft).show()
moving_average_convergence_divergence_pyspark(df_zoom).show()
moving_average_convergence_divergence_pyspark(df_tesla).show()
moving_average_convergence_divergence_pyspark(df_amazon).show()
moving_average_convergence_divergence_pyspark(df_apple).show()
moving_average_convergence_divergence_pyspark(df_facebook).show()


+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+------------------+------------------+-------------------+
|      Date|             High|              Low|             Open|            Close| Volume|        Adj Close|company_name|         Short_EMA|          Long_EMA|               MACD|
+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+------------------+------------------+-------------------+
|2017-01-03|789.6300048828125|775.7999877929688|778.8099975585938|786.1400146484375|1657300|786.1400146484375|      GOOGLE|              NULL|              NULL|               NULL|
|2017-01-04|791.3400268554688|783.1599731445312|788.3599853515625|786.9000244140625|1073000|786.9000244140625|      GOOGLE|1048.9466959635417|              NULL|               NULL|
|2017-01-05|  794.47998046875|  785.02001953125|786.0800170898438|  794.02001953125|133520

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> Le Relative Strength Index (RSI)</span></b>  

Le RSI est un indicateur de momentum qui mesure la vitesse et le changement des mouvements de prix. Il est souvent utilisé pour identifier les conditions de surachat ou de survent

In [43]:
def calculate_rsi(df, period=14):
    window = Window.partitionBy("company_name").orderBy("Date")
    df = df.withColumn("Daily_Change", F.col("Close") - F.lag(F.col("Close"), 1).over(window))

    df = df.withColumn("Gain", F.when(F.col("Daily_Change") > 0, F.col("Daily_Change")).otherwise(0))\
           .withColumn("Loss", F.when(F.col("Daily_Change") < 0, -F.col("Daily_Change")).otherwise(0))

    avg_gain = F.avg(F.col("Gain")).over(Window.partitionBy("company_name").orderBy("Date").rowsBetween(-period, -1))
    avg_loss = F.avg(F.col("Loss")).over(Window.partitionBy("company_name").orderBy("Date").rowsBetween(-period, -1))

    df = df.withColumn("Avg_Gain", avg_gain).withColumn("Avg_Loss", avg_loss)

    df = df.withColumn("RS", F.col("Avg_Gain") / F.col("Avg_Loss"))\
           .withColumn("RSI", 100 - (100 / (1 + F.col("RS"))))

    return df.select("Date", "company_name", "Close", "RSI")

df_amazon = spark.read.csv('misc/AMAZON.csv', header=True, inferSchema=True)

calculate_rsi(df_amazon).show()
calculate_rsi(df_microsoft).show()
calculate_rsi(df_zoom).show()
calculate_rsi(df_tesla).show()
calculate_rsi(df_apple).show()
calculate_rsi(df_facebook).show()
calculate_rsi(df_google).show()

+----------+------------+-----------------+-----------------+
|      Date|company_name|            Close|              RSI|
+----------+------------+-----------------+-----------------+
|2017-01-03|      AMAZON|753.6699829101562|             NULL|
|2017-01-04|      AMAZON|757.1799926757812|             NULL|
|2017-01-05|      AMAZON|780.4500122070312|             NULL|
|2017-01-06|      AMAZON| 795.989990234375|             NULL|
|2017-01-09|      AMAZON|796.9199829101562|             NULL|
|2017-01-10|      AMAZON|795.9000244140625|             NULL|
|2017-01-11|      AMAZON|  799.02001953125|97.69604822154115|
|2017-01-12|      AMAZON|813.6400146484375|97.84773265570806|
|2017-01-13|      AMAZON|817.1400146484375|98.35516958653348|
|2017-01-17|      AMAZON| 809.719970703125|98.44304794025985|
|2017-01-18|      AMAZON|  807.47998046875|88.42725450566373|
|2017-01-19|      AMAZON|809.0399780273438|85.79221085024582|
|2017-01-20|      AMAZON|808.3300170898438|86.08106947175503|
|2017-01

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> bollinger_bands</span></b>  

Les bandes de Bollinger sont un indicateur de volatilité. Elles comportent une moyenne mobile (bande du milieu) et deux bandes externes qui sont des écarts-types au-dessus et en dessous de cette moyenne mobile. Ces bandes aident à identifier la volatilité du marché et les niveaux de surachat ou de survente.

In [44]:
def calculate_bollinger_bands(df, period=20, num_std_dev=2):
    window = Window.partitionBy("company_name").orderBy("Date").rowsBetween(-period + 1, Window.currentRow)
    df = df.withColumn("SMA", F.avg("Close").over(window))
    df = df.withColumn("StdDev", F.stddev("Close").over(window))

    df = df.withColumn("Upper_Band", F.col("SMA") + num_std_dev * F.col("StdDev"))
    df = df.withColumn("Lower_Band", F.col("SMA") - num_std_dev * F.col("StdDev"))

    return df.select("Date", "company_name", "Close", "SMA", "Upper_Band", "Lower_Band")

df_amazon = spark.read.csv('misc/AMAZON.csv', header=True, inferSchema=True)

calculate_bollinger_bands(df_amazon).show()
calculate_bollinger_bands(df_microsoft).show()
calculate_bollinger_bands(df_zoom).show()
calculate_bollinger_bands(df_tesla).show()
calculate_bollinger_bands(df_apple).show()
calculate_bollinger_bands(df_facebook).show()
calculate_bollinger_bands(df_google).show()


+----------+------------+-----------------+-----------------+-----------------+-----------------+
|      Date|company_name|            Close|              SMA|       Upper_Band|       Lower_Band|
+----------+------------+-----------------+-----------------+-----------------+-----------------+
|2017-01-03|      AMAZON|753.6699829101562|753.6699829101562|             NULL|             NULL|
|2017-01-04|      AMAZON|757.1799926757812|755.4249877929688|760.3888912075777|750.4610843783598|
|2017-01-05|      AMAZON|780.4500122070312|763.7666625976562|792.8754693598834|734.6578558354291|
|2017-01-06|      AMAZON| 795.989990234375|771.8224945068359|811.8627816944146|731.7822073192573|
|2017-01-09|      AMAZON|796.9199829101562|   776.8419921875| 818.149686239234|735.5342981357659|
|2017-01-10|      AMAZON|795.9000244140625|780.0183308919271|820.1082240305118|739.9284377533425|
|2017-01-11|      AMAZON|  799.02001953125|782.7328578404018|822.0476810847094|743.4180345960942|
|2017-01-12|      AM

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> Price Oscillator</span></b>  

Le Price Oscillator est un indicateur de momentum similaire au MACD, mais exprimé en pourcentage. Il est utilisé pour identifier les tendances à long terme ou les inversions potentielles de tendance.

In [45]:
def calculate_price_oscillator(df, short_period=12, long_period=26):
    window_short = Window.partitionBy("company_name").orderBy("Date").rowsBetween(-short_period + 1, Window.currentRow)
    window_long = Window.partitionBy("company_name").orderBy("Date").rowsBetween(-long_period + 1, Window.currentRow)

    df = df.withColumn(f"SMA_{short_period}", F.avg("Close").over(window_short))
    df = df.withColumn(f"SMA_{long_period}", F.avg("Close").over(window_long))

    df = df.withColumn("PPO", (F.col(f"SMA_{short_period}") - F.col(f"SMA_{long_period}")) / F.col(f"SMA_{long_period}") * 100)

    return df.select("Date", "company_name", "Close", "PPO")

df_amazon = spark.read.csv('misc/AMAZON.csv', header=True, inferSchema=True)

calculate_price_oscillator(df_amazon).show()
calculate_price_oscillator(df_microsoft).show()
calculate_price_oscillator(df_zoom).show()
calculate_price_oscillator(df_tesla).show()
calculate_price_oscillator(df_apple).show()
calculate_price_oscillator(df_facebook).show()
calculate_price_oscillator(df_google).show()


+----------+------------+-----------------+------------------+
|      Date|company_name|            Close|               PPO|
+----------+------------+-----------------+------------------+
|2017-01-03|      AMAZON|753.6699829101562|               0.0|
|2017-01-04|      AMAZON|757.1799926757812|               0.0|
|2017-01-05|      AMAZON|780.4500122070312|               0.0|
|2017-01-06|      AMAZON| 795.989990234375|               0.0|
|2017-01-09|      AMAZON|796.9199829101562|               0.0|
|2017-01-10|      AMAZON|795.9000244140625|               0.0|
|2017-01-11|      AMAZON|  799.02001953125|               0.0|
|2017-01-12|      AMAZON|813.6400146484375|               0.0|
|2017-01-13|      AMAZON|817.1400146484375|               0.0|
|2017-01-17|      AMAZON| 809.719970703125|               0.0|
|2017-01-18|      AMAZON|  807.47998046875|               0.0|
|2017-01-19|      AMAZON|809.0399780273438|               0.0|
|2017-01-20|      AMAZON|808.3300170898438|0.4404685749

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> On-Balance Volume (OBV) </span></b>  

L'OBV est un indicateur de momentum qui utilise le volume de transaction pour prédire des changements dans le prix de l'actif. Il est basé sur l'idée que les changements de volume précèdent souvent les changements de prix.

In [46]:
def calculate_obv(df):
    window = Window.partitionBy("company_name").orderBy("Date")
    df = df.withColumn("Prev_Close", F.lag("Close").over(window))

    df = df.withColumn("Volume_Direction", F.when(F.col("Close") > F.col("Prev_Close"), F.col("Volume"))
                                            .when(F.col("Close") < F.col("Prev_Close"), -F.col("Volume"))
                                            .otherwise(0))

    window = Window.partitionBy("company_name").orderBy("Date").rowsBetween(Window.unboundedPreceding, 0)
    df = df.withColumn("OBV", F.sum("Volume_Direction").over(window))

    return df.select("Date", "company_name", "Close", "Volume", "OBV")

df_amazon = spark.read.csv('misc/AMAZON.csv', header=True, inferSchema=True)

calculate_obv(df_amazon).show()
calculate_obv(df_microsoft).show()
calculate_obv(df_zoom).show()
calculate_obv(df_tesla).show()
calculate_obv(df_apple).show()
calculate_obv(df_facebook).show()
calculate_obv(df_google).show()


+----------+------------+-----------------+-------+--------+
|      Date|company_name|            Close| Volume|     OBV|
+----------+------------+-----------------+-------+--------+
|2017-01-03|      AMAZON|753.6699829101562|3521100|       0|
|2017-01-04|      AMAZON|757.1799926757812|2510500| 2510500|
|2017-01-05|      AMAZON|780.4500122070312|5830100| 8340600|
|2017-01-06|      AMAZON| 795.989990234375|5986200|14326800|
|2017-01-09|      AMAZON|796.9199829101562|3446100|17772900|
|2017-01-10|      AMAZON|795.9000244140625|2558400|15214500|
|2017-01-11|      AMAZON|  799.02001953125|2992800|18207300|
|2017-01-12|      AMAZON|813.6400146484375|4873900|23081200|
|2017-01-13|      AMAZON|817.1400146484375|3791900|26873100|
|2017-01-17|      AMAZON| 809.719970703125|3670500|23202600|
|2017-01-18|      AMAZON|  807.47998046875|2354200|20848400|
|2017-01-19|      AMAZON|809.0399780273438|2540800|23389200|
|2017-01-20|      AMAZON|808.3300170898438|3376200|20013000|
|2017-01-23|      AMAZON

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> Volatility Index (VIX)</span></b>  

Cette fonction est utilisée pour estimer la volatilité d'un actif, souvent utile dans la gestion des risques et dans la stratégie de trading.

In [47]:
def calculate_volatility_approximation(df, period=30):
    window = Window.partitionBy("company_name").orderBy("Date")
    df = df.withColumn("Prev_Close", F.lag(F.col("Close")).over(window))
    df = df.withColumn("Daily_Return", (F.col("Close") - F.col("Prev_Close")) / F.col("Prev_Close"))

    window = Window.partitionBy("company_name").orderBy("Date").rowsBetween(-period + 1, Window.currentRow)
    df = df.withColumn("Rolling_StdDev", F.stddev(F.col("Daily_Return")).over(window))

    trading_days = 252
    df = df.withColumn("Annualized_Volatility", F.col("Rolling_StdDev") * F.sqrt(F.lit(trading_days)))

    return df.select("Date", "company_name", "Close", "Annualized_Volatility")

df_amazon = spark.read.csv('misc/AMAZON.csv', header=True, inferSchema=True)

calculate_volatility_approximation(df_amazon).show()
calculate_volatility_approximation(df_microsoft).show()
calculate_volatility_approximation(df_zoom).show()
calculate_volatility_approximation(df_tesla).show()
calculate_volatility_approximation(df_apple).show()
calculate_volatility_approximation(df_facebook).show()
calculate_volatility_approximation(df_google).show()


+----------+------------+-----------------+---------------------+
|      Date|company_name|            Close|Annualized_Volatility|
+----------+------------+-----------------+---------------------+
|2017-01-03|      AMAZON|753.6699829101562|                 NULL|
|2017-01-04|      AMAZON|757.1799926757812|                 NULL|
|2017-01-05|      AMAZON|780.4500122070312|   0.2926940333112125|
|2017-01-06|      AMAZON| 795.989990234375|  0.20796071160983765|
|2017-01-09|      AMAZON|796.9199829101562|  0.21820108476891262|
|2017-01-10|      AMAZON|795.9000244140625|   0.2183058068771333|
|2017-01-11|      AMAZON|  799.02001953125|   0.2006335646568875|
|2017-01-12|      AMAZON|813.6400146484375|   0.1900336120668114|
|2017-01-13|      AMAZON|817.1400146484375|  0.17997721603629122|
|2017-01-17|      AMAZON| 809.719970703125|  0.19689130168499352|
|2017-01-18|      AMAZON|  807.47998046875|  0.19343705519806126|
|2017-01-19|      AMAZON|809.0399780273438|  0.18509816776241056|
|2017-01-2

### <b><span style='color:#DEB078'></span><span style='color:#003f88'> Rate of change </span></b>  

Le ROC est un indicateur de momentum qui mesure le taux de changement du prix. Il est utilisé pour identifier la force d'une tendance.

In [48]:
def rate_of_change(df, period=10):
    roc = df.withColumn("Prev_Close", lag("Close", period).over(Window.orderBy("Date")))
    roc = roc.withColumn("ROC", ((col("Close") - col("Prev_Close")) / col("Prev_Close")) * 100)
    return roc.drop("Prev_Close")
rate_of_change(df_amazon).show()
rate_of_change(df_microsoft).show()
rate_of_change(df_zoom).show()
rate_of_change(df_tesla).show()
rate_of_change(df_apple).show()
rate_of_change(df_facebook).show()
rate_of_change(df_google).show()

+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+------------------+
|      Date|             High|              Low|             Open|            Close| Volume|        Adj Close|company_name|               ROC|
+----------+-----------------+-----------------+-----------------+-----------------+-------+-----------------+------------+------------------+
|2017-01-03| 758.760009765625|747.7000122070312|757.9199829101562|753.6699829101562|3521100|753.6699829101562|      AMAZON|              NULL|
|2017-01-04|759.6799926757812|754.2000122070312|758.3900146484375|757.1799926757812|2510500|757.1799926757812|      AMAZON|              NULL|
|2017-01-05|782.4000244140625| 760.260009765625|761.5499877929688|780.4500122070312|5830100|780.4500122070312|      AMAZON|              NULL|
|2017-01-06|799.4400024414062|  778.47998046875|782.3599853515625| 795.989990234375|5986200| 795.989990234375|      AMAZON|              NULL|

In [35]:
# Stop the Spark session
spark.stop()