In [1]:
#import findspark
#findspark.init()

import pyspark
import databricks.koalas as ks
pyspark.__file__



'C:\\Users\\Khalil\\Documents\\Projet_Efrei\\nasdaq_stock\\venv\\lib\\site-packages\\pyspark\\__init__.py'

## 1- Exploration

### Setting up spark environment

In [2]:
spark = pyspark.sql.SparkSession.builder.master("local[1]").appName("Nasdaq").getOrCreate()

### Function to read files

The `read(data_folder_path)` reads csv files. The schema is infered except for the *Date* column. It is cast as a `DateType` later.

- `data_folder_path` is the path to the folder where csv files were placed.  
- Pyspark will read every csv files inside of the input folder and merge them in a single dataframe.

We show the schema of the dataframe and the 40 first rows.

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import DateType

def read(data_folder_path):
    dataframe = spark.read.options(header='True', dateFormat='yyyy-MM-dd', inferSchema='True').csv(data_folder_path)
    dataframe = dataframe.withColumn("Date", col("Date").cast(DateType()))
    return dataframe

data_folder_path = "C:/Users/your/path/to/data"
dataframe = read(data_folder_path)
dataframe.printSchema()
dataframe.show(40)

### Basic Exploration

**Get the name of the stocks as a list**  
The method `get_names_stocks()` gets the names of the stocks present in the loaded dataframe and we put them in a list. This will enable not to change the code every time we add an additional stock to analyze.

In [None]:
def get_names_stocks(df):
    stock_names = []
    names = df.select('company_name').distinct().collect()
    for name in names:
        stock_names.append(name['company_name'])
    return stock_names

stock_names = get_names_stocks(dataframe)
for name in stock_names:
    print(name)

**Show the first and last 40 rows for each stock:**  
- `df.show(n)` shows the n first rows  
- To show the last n rows, the dataframe is ordered by date in a descending order.

In [None]:
def show_first_last_nrows(df, n=40):
    names = get_names_stocks(df)
    for name in names:
        print("{} first rows for stock named {}:".format(n, name))
        df.where(df.company_name == name).show(n)
        print("\n")
        print("{} last rows for stock named {}:".format(n, name))
        if "Date" not in df.columns:
            df.where(df.company_name == name).orderBy(['year', 'month'], ascending=False).show(n)
        else:
            df.where(df.company_name == name).orderBy("Date", ascending=False).show(n)
        print("\n")

show_first_last_nrows(dataframe)

**Get the number of observations, the period of values (day, week, month)**
- `get_num_days()`: The number of days is calculated by counting the number of distinct values in the values of year and day. The methods `year()` and `dayofyear()` enable to decompose a date in its values of year and day. We count these values with `df.count()`
- `get_num_months()`: The number of months is calculated by counting the number of distinct values in the values of years and months. Years and months are extracted with the methods `year()` and `month()`
- `get_num_years()`: The number of years is calculated by counting the number of distinct values in the table of years selected with the method `year()`

In [None]:
from pyspark.sql.functions import year, month, dayofyear

def get_num_days(df):
    return df.select(year('Date').alias('year'), dayofyear('Date').alias('day')).distinct().count()

def get_num_months(df):
    return df.select(year("Date").alias('year'), month("Date").alias('month')).distinct().count()

def get_num_years(df):
    return df.select(year("Date").alias('year')).distinct().count()


print("Number of observations:")
num_days = get_num_days(dataframe)
print(">>>> {} days".format(num_days))
num_months = get_num_months(dataframe)
print(">>>> {} months".format(num_months))
num_years = get_num_years(dataframe)
print(">>>> {} years".format(num_years))

**The dates of the first and last observation for each stock**  
- We iterate on every stock to get this information thanks to the method `get_names_stocks()`
- The first date value in the column *Date* is taken with the metohd `first()`
- The last date value is taken by taking the first value of the column *Date* in a reverse order

In [None]:
def get_boundary_dates(df):
    names = get_names_stocks(df)
    boundary_dates = {}
    for name in names:
        first_date = df.select("Date").where(df.company_name == name).first()[0]
        last_date = df.select("Date").where(df.company_name == name).orderBy("Date", ascending=False).first()[0]
        boundary_dates[name] = (first_date, last_date)
    return boundary_dates

boundary_dates = get_boundary_dates(dataframe)
for stock_name in boundary_dates:
    print("First and last observation date for {} stock: {}  -  {}".format(stock_name, str(boundary_dates[stock_name][0]), str(boundary_dates[stock_name][1])))

**Descriptive Statistics: min, max, mean, standard deviation for each stock**  
We show descriptive statistics for each stock. Descriptive statistics are extracted with the method `describe()`

In [None]:
def show_statistics(df):
    names = get_names_stocks(df)
    for name in names:
        print("Descriptive statistics for", name)
        df.where(df.company_name == name).describe().show()
        print("\n")
    

show_statistics(dataframe)

**Number of missing values**  
We show the number of missing values for each stock.
The method `isnull()` is used to check if a value is null or na (a missing value)  
No missing values were found in the original data.

In [None]:
from pyspark.sql.functions import isnull, when, count, col

def show_num_missing_val(df):
    names = get_names_stocks(df)
    for name in names:
        print("Descriptive statistics for", name)
        df.where(df.company_name == name).select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()
        print("\n")

show_num_missing_val(dataframe)

**Correlation between values for each stock**  
The correlation is computed by converting the spark dataframe to a koalas dataframe with the method `to_koalas()`  
The correlation is then calculated with the method `corr()`  
The columns *High, Low, Open, Close, Adj Close* are very correlated which was expected.

In [None]:
def show_correlation(df):
    names = get_names_stocks(df)
    for name in names:
        df_k = df.where(df.company_name == name).to_koalas()
        corr_matrix = df_k.corr('pearson')
        print("Correlation Matrix for", name)
        print(corr_matrix)
        print("\n")

show_correlation(dataframe)

### Answering some questions

**What is the average of the opening and closing prices for each stock price and for different time periods (week, month, year) ?**  
We first iterate on the different stocks with the methods `get_names_stocks(df)` and `where()`.  
The column *Date* is first decomposed in years, months and weeks with the methods `year()`, `month()` and `weekofyear()`.  
Values are grouped by the input period of observation we are looking for and an average is done on *Open* and on *Close* with the method `agg()`.
We show the 5 first rows of each dataframe we construct.

In [None]:
from pyspark.sql.functions import year, month, weekofyear, avg

def avg_opening_closing(df, period='year', n=5):
    names = get_names_stocks(df)
    for name in names:
        if period == 'year':
            print("Average of Open and Close for {} for every {}:".format(name, period))
            df.where(df.company_name == name).select(year('Date').alias('Year'), 'Open', 'Close') \
            .groupBy('Year').agg({'Open': 'avg', 'Close': 'avg'}).orderBy('Year').show(n)
            print("\n")
        elif period == 'month':
            print("Average of Open and Close for {} for every {}:".format(name, period))
            df.where(df.company_name == name).select(year('Date').alias('Year'), month('Date').alias('Month'), 'Open', 'Close') \
            .groupBy('Year', 'Month').agg({'Open': 'avg', 'Close': 'avg'}).orderBy('Year', 'Month').show(n)
            print("\n")
        elif period == 'week':
            print("Average of Open and Close for {} for every {}:".format(name, period))
            df.where(df.company_name == name).select(year('Date').alias('Year'), month('Date').alias('Month'), weekofyear('Date').alias('Week'), 'Open', 'Close') \
            .groupBy('Year', 'Month', 'Week').agg({'Open': 'avg', 'Close': 'avg'}).orderBy('Year', 'Month', 'Week').show(n)
            print("\n")



avg_opening_closing(dataframe, period='year')
avg_opening_closing(dataframe, period='month')
avg_opening_closing(dataframe, period='week')

**How do the stock prices change day to day and month to month for each stock ?**  
- To assess how the stock prices changes day to day, we compute the difference between the *Close* of day n with the *Close* of day n-1. To do this we create a new column with the *Close* of the previous observation and we compute the difference. The pyspark `lead()` function is used to do so. This means that the first value will be set to `null`. To use `lead()` a window is needed to be specified. The window we specify is partitioned by the company names in the dataframe. A new column called *day2day* is created and the dataframe is returned.
- The same approach is done to assess how the stock prices changes month to month. We compute the difference between the *Close* value of the last day of month n with the *Close* value of the month n-1. To do so we create a dataframe with only the last day of each month of each day and we join this table with the original dataframe. That way, we have the values of *Close* for each last day of each month. We can compute the difference between the *Close* values just like before with the `lead()` function. A new column called *month2month* is created and the dataframe is returned.

We show the first 5 rows and the last 5 rows of each returned dataframe for each stock and for the day and month periods. 

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, month, dayofmonth, year

def day2day_change(df):
    window_spec = Window.partitionBy("company_name").orderBy('Date')
    new_df = df.withColumn("Previous Day's Close", lead("Close", -1).over(window_spec))
    new_dfk = new_df.to_koalas()
    new_dfk = new_dfk.assign(day2day= new_dfk['Close'] - new_dfk["Previous Day's Close"])
    return new_dfk.to_spark()

def month2month_change(df):
    window_spec = Window.partitionBy("company_name").orderBy('year', 'month')
    decomposed_date_df = df.select(year('Date').alias('year'), month('Date').alias('month'), dayofmonth('Date').alias('day'),\
                       'High', 'Low', 'Open', 'Close', 'Volume', 'Adj Close', 'company_name')
    end_month_date_df = decomposed_date_df.groupby('year', 'month').agg({'day': 'max'}).withColumnRenamed("max(day)", 'day').orderBy('year', 'month')
    month2month_df = decomposed_date_df.join(end_month_date_df, ['year', 'month', 'day'])
    month2month_df = month2month_df.withColumn("Previous Month's Close", lead('Close', -1).over(window_spec))
    month2month_dfk = month2month_df.to_koalas()
    month2month_dfk = month2month_dfk.assign(month2month= month2month_dfk['Close'] - month2month_dfk["Previous Month's Close"])
    return month2month_dfk.to_spark()
    
dataframe_day2_day = day2day_change(dataframe)
print("Day to day change for each stock:")
print("\n")
show_first_last_nrows(dataframe_day2_day, n=5)
dataframe_month2month = month2month_change(dataframe)
print("Month to month change for each stock:")
print("\n")
show_first_last_nrows(dataframe_month2month, n=5)

**Calculate the daily return of each stock**  
The daily return is computed by the difference between the *Close* value and the *Open* value divided by the *Open* value. It is then multiplied by 100 to get a percentage.  
The dataframe is converted to a *Koalas* dataframe. A new column called *daily_return* is created with the method `assign()`.  
The dataframe is converted to *pyspark* dataframe and it is returned. We show the first 5 rows.

In [None]:
def daily_return(df):
    df_k = df.to_koalas()
    df_k = df_k.assign(daily_return= 100 * ((df_k['Close'] - df_k['Open']) / df_k['Open']))
    return df_k.to_spark()


dataframe_dr = daily_return(dataframe)
dataframe_dr.show(5)

**What are the stocks with the highest daily return ?**  
To answer this question we order the dataframe by the daily return we calculated in the previous cell. We use the method `orderBy()` to order *daily_return* values in a descending order. We show the 50 highest daily returns in our input dataframe. We can see that Tesla and ZOOM have the highest daily returns values.

In [None]:
def show_highest_daily(df, n=50):
    daily_return_df = daily_return(df)
    daily_return_df.orderBy("daily_return", ascending=False).show(n)
    
show_highest_daily(dataframe, 50)

**Calculate the average daily return for different periods (week, month and year) for each stock**  
We first calculate the daily return with the previous method `daily_return(dataframe)`. The *Date* field is decomposed into columns *year*, *month* and *week*. Depending on the period input of our method `avg_daily_return(df, period)`, we group by the specific period and we compute the average on the field *daily_return* with the `agg()` method. We finally order by the period of observation and we return the dataframe.

In [None]:
from pyspark.sql.functions import month, weekofyear, year, avg

def avg_daily_return(df, period):
    dr_df = daily_return(df)
    if period == 'year':
        avg_dr_df = dr_df.select(year('Date').alias('year'), month('Date').alias('month'), weekofyear('Date').alias('week'),\
                            'High', 'Low', 'Open', 'Close', 'Volume', 'Adj Close', 'company_name', 'daily_return')\
        .groupBy('year', 'company_name').agg({'daily_return': 'avg'}).orderBy('year')
        return avg_dr_df
    elif period == 'month':
        avg_dr_df = dr_df.select(year('Date').alias('year'), month('Date').alias('month'), weekofyear('Date').alias('week'),\
                            'High', 'Low', 'Open', 'Close', 'Volume', 'Adj Close', 'company_name', 'daily_return')\
        .groupBy('year', 'month', 'company_name').agg({'daily_return': 'avg'}).orderBy('year', 'month')
        return avg_dr_df
    elif period == 'week':
        avg_dr_df = dr_df.select(year('Date').alias('year'), month('Date').alias('month'), weekofyear('Date').alias('week'),\
                            'High', 'Low', 'Open', 'Close', 'Volume', 'Adj Close', 'company_name', 'daily_return')\
        .groupBy('year', 'week', 'company_name').agg({'daily_return': 'avg'}).orderBy('year', 'week')
        return avg_dr_df

    
avg_week_dr_df = avg_daily_return(dataframe, 'week')
print("Average values of daily return for a week period:")
avg_week_dr_df.show()
avg_month_dr_df = avg_daily_return(dataframe, 'month')
print("Average values of daily return for a month period:")
avg_month_dr_df.show()
avg_year_dr_df = avg_daily_return(dataframe, 'year')
print("Average values of daily return for a year period:")
avg_year_dr_df.show()

### Moving Average  

The method `moving_avg(df, column_name, num_points)` computes the moving average. It takes three inputs:
- `df` the input dataframe
- `column_name` the name of the column to compute the moving average
- `num_points` the number of points on which we compute the average.  

The average is calculated thanks to a window partitionning our dataframe by our stocks names and by taking a window on the number of points we want to do the average. The method `rowsBetween()` is used to specify the number of observations we take to calculate the moving average. The dataframe with the additional column is returned.  
For the first `num_points` values, the average is done on the values that we have. Missing values are ignored.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, month, dayofmonth, year, avg

def moving_avg(df, column_name, num_points):
    window_spec = Window.partitionBy("company_name").orderBy("Date").rowsBetween(-num_points + 1, 0)
    return df.withColumn(column_name + '_moving_avg', avg(column_name).over(window_spec))
    
column_name = 'Open'
number_observations = 5
moved_avg_df = moving_avg(dataframe, column_name, number_observations)
print("Moving average for {} observations and for the column named '{}':".format(number_observations, column_name))
moved_avg_df.show()

### Correlation between stocks  

The correlation between stocks makes sense if we compute it for a same column. We first create a method `get_stock_values(df, company_names, column_name)` to get the values of the two input stocks for the input column. The parameters are the following:
- `df` the input dataframe
- `company_names` the two stocks names in a list.
- `column_name` the name of the column on which the correlation will be calculated.

On the input dataframe, we select the date and the values of the input column name for each input stock in a separate table. These two tables are then joined on *Date* column and the column *Date* is then dropped with the `drop()` method. The pyspark dataframe is then returned.

The correlation is then computed with the method `correlation(df)` taking as input a pyspark dataframe with only two columns. The correlation is computed with the method `corr()` from Koalas library. We show the first 5 rows from the dataframe returned by the method `get_stock_values(df, company_names, column_name)` for the stock Tesla and Zoom. We see that these two stocks are very correlated.

In [None]:
def get_stock_values(df, company_names, column_name):
    df_company1 = df.select('Date', column_name).withColumnRenamed(column_name, column_name + "_" + company_names[0]).where(df.company_name == company_names[0])
    df_company2 = df.select('Date', column_name).withColumnRenamed(column_name, column_name + "_" + company_names[1]).where(df.company_name == company_names[1])
    joined_df = df_company1.join(df_company2, 'Date').drop('Date')
    return joined_df

def correlation(df):
    dfk = df.to_koalas()
    corr_dfk = dfk.corr('pearson')
    return corr_dfk
    

zoom_tesla_open_df = get_stock_values(dataframe, ["ZOOM", "TESLA"], 'Close')
print("First 5 rows of the values from 'Close' column for stocks ZOOM and TESLA:")
zoom_tesla_open_df.show(5)
print("Result of the correlation between the stocks ZOOM and TESLA for the column 'Close':")
correlation(zoom_tesla_open_df)

### Return rate

Return rates are calculated based on calendar periods. If the period is one week, it is the difference between the *Close* of Friday with the *Open* of Monday of the same week divided by the *Open* value. The return rate is expressed as a percentage so we multiply by 100. If the period is one month, then we will take the *Open* value of the first day of the month and the *Close* of the last day of the month. We do the same for a period of one year.  
However it may happen that the first or last day of a month or a year is not a day where stock exchanges are opened (on Saturdays, Sundays for instance), in this case we take the next or the previous day.  
To do so, we decompose the date field in different ways depending on the input period:
- for weekly return rate, we decompose the date by year, week of year and day of week. This enables to group by the values of years and weeks and to take the first day and last day of the week present in our data. We use the `agg()` method to take the `min` and `max` in the values of day of the week. `min` and `max` are computed in two different dataframes so we merge them with the method `union()` and we join by dates values with the input dataframe with the `join()` method. We then use the `lead()` method with a window partitioned by year and week to take the *Open* value of the previous row (it corresponds to the min value we had queried just before). Therefore, we have a `null` value at each first row of each window. We remove those rows with the `dropna()` method. We also drop the columns *day* and *Open* because we don't need them anymore. Besides, the weekly return rate makes sense when provided with the number of the week and not with the number of the day. The dataframe is converted to a Koalas dataframe, and we compute the weekly return rate with the `assign()` method.
- for monthly return rate, we decompose the date by year, month and day of month. This enables to group by the values by years and months and to take the first day and last day of the month present in our data. Then, we take the same approach to get the monthly return rate
- for annual return rate, we decompose the date by year and day of year. This enables to group by the values by years and to take the first day and last day of the year present in our data. Then, we take the same approach to get the annual return rate

The method `return_rate(df, stock, period)` takes three inputs:
- df the input dataframe
- stock: the name of the stock on which to compute the return rate
- period: a string to specify the period (week, year or month)  

The method returns a dataframe with columns specifying the calendar period values, the stock name, the last *Close* value of the given calendar period, the first *Open* value of the given calendar period and the return rate value for this period.  
We show the 20 first rows of the monthly return rate dataframe for the stock ZOOM.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, month, weekofyear, year, dayofmonth, dayofweek, dayofyear

def return_rate(df, stock, period):
    if period == 'week':
        date_df = df.select(year('Date').alias('year'), weekofyear('Date').alias('week'), dayofweek('Date').alias('day'), 'Open', 'Close', 'company_name')\
        .where(df.company_name == stock)
        first_day_week_df = date_df.groupBy('year', 'week').agg({'day': 'min'}).withColumnRenamed('min(day)', 'day')
        last_day_week_df = date_df.groupBy('year', 'week').agg({'day': 'max'}).withColumnRenamed('max(day)', 'day')
        union_df = first_day_week_df.union(last_day_week_df).join(date_df, ['year', 'week', 'day'])
        window_spec = Window.partitionBy('year', 'week').orderBy('year', 'week', 'day')
        return_rate_dfk = union_df.withColumn("first_open_of_week", lead("Open", -1).over(window_spec)).dropna().drop('day', 'Open').to_koalas()
        return_rate_dfk = return_rate_dfk.assign(weekly_return_rate= 100 * ((return_rate_dfk['Close'] - return_rate_dfk['first_open_of_week']) / return_rate_dfk['first_open_of_week']))
        return return_rate_dfk.to_spark().orderBy('year', 'week')
    elif period == 'month':
        date_df = df.select(year('Date').alias('year'), month('Date').alias('month'), dayofmonth('Date').alias('day'), 'Open', 'Close', 'company_name')\
        .where(df.company_name == stock)
        first_day_month = date_df.groupBy('year', 'month').agg({'day': 'min'}).withColumnRenamed('min(day)', 'day')
        last_day_month = date_df.groupBy('year', 'month').agg({'day': 'max'}).withColumnRenamed('max(day)', 'day')
        union_df = first_day_month.union(last_day_month).join(date_df, ['year', 'month', 'day'])
        window_spec = Window.partitionBy('year', 'month').orderBy('year', 'month', 'day')
        return_rate_dfk = union_df.withColumn('first_open_of_month', lead('Open', -1).over(window_spec)).dropna().drop('day', 'Open').to_koalas()
        return_rate_dfk = return_rate_dfk.assign(monthly_return_rate= 100 * ((return_rate_dfk['Close'] - return_rate_dfk['first_open_of_month']) / return_rate_dfk['first_open_of_month']))
        return return_rate_dfk.to_spark().orderBy('year', 'month')
    elif period == 'year':
        date_df = df.select(year('Date').alias('year'), dayofyear('Date').alias('day'), 'Open', 'Close', 'company_name').where(df.company_name == stock)
        first_day_year = date_df.groupBy('year').agg({'day': 'min'}).withColumnRenamed('min(day)', 'day')
        last_day_year = date_df.groupBy('year').agg({'day': 'max'}).withColumnRenamed('max(day)', 'day')
        union_df = first_day_year.union(last_day_year).join(date_df, ['year', 'day'])
        window_spec = Window.partitionBy('year').orderBy('year', 'day')
        return_rate_dfk = union_df.withColumn('first_open_of_year', lead('Open', -1).over(window_spec)).dropna().drop('day', 'Open').to_koalas()
        return_rate_dfk = return_rate_dfk.assign(annual_return_rate= 100 * ((return_rate_dfk['Close'] - return_rate_dfk['first_open_of_year']) / return_rate_dfk['first_open_of_year']))
        return return_rate_dfk.to_spark().orderBy('year')

return_rate(dataframe, 'ZOOM', 'month').show()

### Given a specific date and period, what is the stock with the best return rate ?  
For this question, the periods are not calendar periods anymore. For example, if the input date is 2018-04-21 and the input period is month. Then the return rate will be computed with the difference of the *Close* value of the date 2018-04-21 and with the *Open* of the date 2018-03-21. 

The method `return_rate_by_date(df, stock, date, period)` computes the return rate for a specific stock, date and period.
To implement this function, we first check if the input date value is in the correct format **YYYY-MM-DD** and if the date appears in the dataframe (if the date is not a day when the market is closed). In these cases, the method returns `None`.

In the other cases, the input date is split in year, month and day. With a `where()` and a `collect()` function, we get the *Close* value for the input stock at the input date. We then compute the date value corresponding to the beginning of the period thanks to the Python `datetime` module:
- if the period is a week, we take the date corresponding to seven days before the input date. The function `datetime.timedelta(days=7)` manage to do that. However it may happen that the computed date corresponds to a date when the market is closed. In this case we take the date corresponding to 6 days before the input date, then 5 days if the market is also closed and so on...
- if the period is a month, we take the date corresponding to 30 days before the input date. We use the same function but with `days=30` and the same approach as for a weekly period.
- if the period is a year, we take the date corresponding to 365 days before the input date. We use the same function but with `days=365` and the same approach as for a weekly period.

Since we have the beginning date period, we can collect the *Open* value in the dataframe for the input stock. We can compute the return rate for the given period, date and stock. The function returns the return rate value.

Another function called `show_highest_return_rate(df, date, period)` is the function showing the stock with the highest return rate for a given date and period. With a `for` loop, we iterate on the names of the stocks present in the input dataframe `df` (thanks to the method `get_names_stocks(df)` defined in the first cells). For each stock, we compute the return rate. At the end of the loop, we print the stock with the highest return rate. We also show the other stocks return rates values.

In [None]:
import datetime
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, month, weekofyear, year, dayofmonth, dayofweek, dayofyear

def return_rate_by_date(df, stock, date, period):
    if type(date) != str:
        return None
    if len(df.where(df.company_name == stock).where(df.Date == date).collect()) == 0:
        return None
    date_split = date.split('-')
    if len(date_split) != 3:
        return None
    year, month, day = date_split[0], date_split[1], date_split[2]
    current_date = datetime.date(int(year), int(month), int(day))
    current_date_val = df.select('Date', 'Close', 'company_name').where(df.company_name == stock).where(df.Date == date).collect()[0]
    if period == 'week':
        delta = datetime.timedelta(days=7)
        previous_week_date = current_date - delta
        previous_week_date_val = df.select('Date', 'Open', 'company_name').where(df.company_name == stock).where(df.Date == str(previous_week_date)).collect()
        i = 1
        while previous_week_date_val == []:
            delta = datetime.timedelta(days=7-i)
            previous_week_date = current_date - delta
            previous_week_date_val = df.select('Date', 'Open', 'company_name').where(df.company_name == stock).where(df.Date == str(previous_week_date)).collect()
            i += 1
        return_rate = 100 * ((current_date_val['Close'] - previous_week_date_val[0]['Open']) / previous_week_date_val[0]['Open'])
        return return_rate
    elif period == 'month':
        delta = datetime.timedelta(days=30)
        previous_month_date = current_date - delta
        previous_month_date_val = df.select('Date', 'Open', 'company_name').where(df.company_name == stock).where(df.Date == str(previous_month_date)).collect()
        i = 1
        while previous_month_date_val == []:
            delta = datetime.timedelta(days=30-i)
            previous_month_date = current_date - delta
            previous_month_date_val = df.select('Date', 'Open', 'company_name').where(df.company_name == stock).where(df.Date == str(previous_month_date)).collect()
            i += 1
        return_rate = 100 * ((current_date_val['Close'] - previous_month_date_val[0]['Open']) / previous_month_date_val[0]['Open'])
        return return_rate
    elif period == 'year':
        delta = datetime.timedelta(days=365)
        previous_year_date = current_date - delta
        previous_year_date_val = df.select('Date', 'Open', 'company_name').where(df.company_name == stock).where(df.Date == str(previous_year_date)).collect()
        i = 1
        while previous_year_date_val == []:
            delta = datetime.timedelta(days=365-i)
            previous_year_date = current_date - delta
            previous_year_date_val = df.select('Date', 'Open', 'company_name').where(df.company_name == stock).where(df.Date == str(previous_year_date)).collect()
            i += 1
        return_rate = 100 * ((current_date_val['Close'] - previous_year_date_val[0]['Open']) / previous_year_date_val[0]['Open'])
        return return_rate
    
def show_highest_return_rate(df, date, period):
    names = get_names_stocks(df)
    return_rates = {}
    maximum_rr = None
    maximum_stock = None
    for name in names:
        rr = return_rate_by_date(df, name, date, period)
        return_rates[name] = rr
        if rr is not None:
            if maximum_rr is None:
                maximum_rr = rr
                maximum_stock = name
            else:
                if rr > maximum_rr:
                    maximum_rr = rr
                    maximum_stock = name
    if maximum_rr is None:
        print("Wrong date input !")
    else:
        print("Highest return rate for a period of one {} for the {} end date is:".format(period, date))
        print("---> {} with a return rate of {} %".format(maximum_stock, round(maximum_rr, 2)))
        print("Other return rates:")
        for name in return_rates:
            if name != maximum_stock:
                print("---> Return rate of {} is {} %".format(name, round(return_rates[name], 2)))
        
show_highest_return_rate(dataframe, '2020-02-28', 'month')

### Additional insights on data

**What is the biggest intraday price swing for each stock ?**  
To answer this, we first compute a new column based on the difference between *High* and *Low* divided by *Close* and multiplied by 100 to make it as a percentage for each stock. For each stock, we order by the values we calculated in a descending order and we take the first row with the method `collect()`. We then print the result for each stock.

In [None]:
from pyspark.sql.functions import col

def show_biggest_fluctuation(df):
    names = get_names_stocks(df)
    df_fluctuation = df.withColumn("Swing", 100 * ((col("High") -  col("Low")) / col("Close")))
    for name in names:
        row = df_fluctuation.where(df.company_name == name).orderBy("Swing", ascending=False).collect()[0]
        print("For {} stock, the highest intrady price swing was of {} and it occured at the date {}".format(name, round(row[8], 2), row.Date))
        print()

show_biggest_fluctuation(dataframe)

**What is the month with the highest average daily return for each stock ?**  
We first compute the average of the daily return for a month period with the function `avg_daily_return(df, "month")` defined in a previous cell. For each stock, we order the dataframe by the average daily return in a descending order and we collect the first value corresponding to the highest average daily return.

In [None]:
def show_month_highest_dr(df):
    avg_month_dr_df = avg_daily_return(df, "month")
    names = get_names_stocks(df)
    for name in names:
        row = avg_month_dr_df.where(avg_month_dr_df.company_name == name).orderBy("avg(daily_return)", ascending=False).collect()[0]
        print("For {} stock, the highest average daily return in a month was of {} and occured at the month {} of the year {}".format(name, round(row[3], 3), row[1], row[0]))
        print()
        
show_month_highest_dr(dataframe)

**What is the month with the biggest number of trades for each stock ?**  
To answer this question, we select the year and the month of the *Date* column and the *Volume* and *company_name* columns. We iterate on the names of the stocks returned by the function `get_names_stocks(df)`. A `groupBy()` on the year and month values enables us to compute the average of the *Volume* values in this periods. The dataframe is ordered by these values in a descending order. The first value that we collect corresponds to the month with the biggest number of trades for this specific month and stock.

In [None]:
from pyspark.sql.functions import month, weekofyear, year, dayofmonth

def show_number_trades(df):
    names = get_names_stocks(df)
    date_df = df.select(year('Date').alias('year'), month('Date').alias('month'), "Volume", "company_name")
    for name in names:
        row = date_df.where(date_df.company_name == name).groupBy('year', 'month').agg({'Volume': 'avg'}).orderBy("avg(Volume)", ascending=False).collect()[0]
        print()
        print("For {} stock, the highest number of trades was of {} and it occured at the month {} of the year {}".format(name, round(row[2]), row[1], row[0]))
    
show_number_trades(dataframe)

**When was the highest and lowest peak for each stock ?**
- To get the highest peak, we iterate on each stock and we order by the *High* values in a descending order. The first value corresponds to the highest peak of the given stock.
- To get the lowest peak, we iterate on each stock and we order by the *Low* values in an ascending order. The first value corresponds to the lowest peak of the given stock.

In [None]:
def show_highest_lowest_peak(df):
    names = get_names_stocks(df)
    for name in names:
        print("For {} stock :".format(name))
        row_high = df.where(df.company_name == name).orderBy("High", ascending=False).collect()[0]
        print("---> The highest peak occured at the date {} and it reached a value of {}".format(row_high[0], round(row_high[1], 2)))
        row_low = df.where(df.company_name == name).orderBy("Low").collect()[0]
        print("---> The lowest peak occured at the date {} and it reached a value of {}".format(row_low[0], round(row_low[1], 2)))
        print()
        
        
show_highest_lowest_peak(dataframe)