# French Stock Market Analysis

In [8]:
!pip install pyspark



In [9]:
import pyspark
from pyspark.sql import SparkSession

In [10]:
spark = SparkSession.builder \
    .appName("French Stock Market Analysis") \
    .getOrCreate()


In [11]:
stocks = spark.read.csv("stock.csv",header = True)

In [12]:
stocks.count()

602962

In [13]:
#Data types
stocks.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Dividends: string (nullable = true)
 |-- Stock Splits: string (nullable = true)
 |-- Company: string (nullable = true)



In [14]:
#Selecting and filtering data
stocks.select(["Company","Date","Open","Close"]).filter((stocks.Company =="META") | (stocks.Company =="APPL")).show(10)
#stocks.select(["Company","Date","Open","Close"]).filter((stocks.Company isin(["META","APPL"])) ).show(10)

+-------+--------------------+----------------+----------------+
|Company|                Date|            Open|           Close|
+-------+--------------------+----------------+----------------+
|   META|2018-11-29 00:00:...|135.919998168945|138.679992675781|
|   META|2018-11-30 00:00:...|138.259994506836|140.610000610352|
|   META|2018-12-03 00:00:...|             143|141.089996337891|
|   META|2018-12-04 00:00:...|140.729995727539|137.929992675781|
|   META|2018-12-06 00:00:...|133.820007324219|139.630004882813|
|   META|2018-12-07 00:00:...|          139.25|137.419998168945|
|   META|2018-12-10 00:00:...|139.600006103516|141.850006103516|
|   META|2018-12-11 00:00:...|143.880004882813|142.080001831055|
|   META|2018-12-12 00:00:...|143.080001831055|           144.5|
|   META|2018-12-13 00:00:...|145.570007324219|145.009994506836|
+-------+--------------------+----------------+----------------+
only showing top 10 rows



In [15]:
#UDF : User defined functions
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType # conserve le fuseau horaire contrairement à Datetype()
from dateutil.parser import parse

In [16]:
date_parse = udf(lambda date: parse(date), TimestampType())

In [17]:
stocks = stocks.withColumn("Date",date_parse(stocks.Date))

In [18]:
def num_parse(value):
    if isinstance(value, int) or isinstance(value, float):
        return value;
    elif isinstance(value, str):
        return float(value);

In [19]:
from pyspark.sql.types import FloatType
udf_num = udf(lambda num: num_parse(num),FloatType())
stocks = (stocks.withColumn("Open", udf_num(stocks.Open))
        .withColumn("High", udf_num(stocks.High))
        .withColumn("Low", udf_num(stocks.Low))
        .withColumn("Close", udf_num(stocks.Close)))

In [20]:
stocks.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Dividends: string (nullable = true)
 |-- Stock Splits: string (nullable = true)
 |-- Company: string (nullable = true)



In [21]:
from pyspark.sql.types import IntegerType

parse_int= udf( lambda x: int(x),IntegerType())

In [22]:
stocks = (stocks.withColumn("Volume", parse_int(stocks.Volume)))

In [23]:
stocks.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Close: float (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Dividends: string (nullable = true)
 |-- Stock Splits: string (nullable = true)
 |-- Company: string (nullable = true)



In [24]:
#Stock Splits and Dividends had to be deleted

In [25]:
cleaned_stocks = stocks.select(["Company","Date","Open","High","Low","Close","Volume"])

In [26]:
from pyspark.sql.functions import *

In [27]:
cleaned_stocks = cleaned_stocks.withColumn("Date",to_date(cleaned_stocks.Date))

In [28]:
#General Description of the data
cleaned_stocks.describe(["Open","Low","High","Close","Volume"]).show()

+-------+------------------+------------------+------------------+------------------+--------------------+
|summary|              Open|               Low|              High|             Close|              Volume|
+-------+------------------+------------------+------------------+------------------+--------------------+
|  count|            602962|            602962|            602962|            602962|              602962|
|   mean|140.07471066013434|138.27631646033322|141.85349172877469|140.09520381512718|   5895601.184908833|
| stddev|275.40172508018037|271.89527635964305|279.00319103885164|275.47796892551145|1.3815961832517391E7|
|    min|         1.0524246|         1.0261139|         1.0611948|         1.0348841|                   0|
|    max|           6490.26|            6405.0|            6525.0|           6509.35|          1123003300|
+-------+------------------+------------------+------------------+------------------+--------------------+



In [29]:
#Maximum open stock price for each company. Price has been sorted from the most expensive to the cheapiest
cleaned_stocks.groupBy("Company").max("Open").sort("max(Open)", ascending=[False])

DataFrame[Company: string, max(Open): float]

In [30]:
#Maximum open stock price for each company. Price has been sorted from the cheapiest the most expensive
cleaned_stocks.groupBy("Company").max("Open").sort("max(Open)", ascending=[True])

DataFrame[Company: string, max(Open): float]

In [31]:
cleaned_stocks.groupBy("Company").max("Open").withColumnRenamed("max(Open)","Max_Stock_Price")

DataFrame[Company: string, Max_Stock_Price: float]

In [32]:
import pyspark.sql.functions as func
cleaned_stocks.groupBy("Company").agg(func.max("Open").alias("Max_Stock_Price"))

DataFrame[Company: string, Max_Stock_Price: float]

In [33]:
cleaned_stocks.groupBy("Company").agg(
    func.max("Open").alias("Max_Stock_Price"),
    func.sum("Volume").alias("Total_Volume")
).show(15)

+-------+---------------+------------+
|Company|Max_Stock_Price|Total_Volume|
+-------+---------------+------------+
|   SPGI|      472.02524|  1855729493|
|    GIS|      89.177826|  4982867036|
|    LEN|      131.22336|  3557574424|
|    RTX|     102.514465|  8520180807|
|   HUBS|         852.08|   781645938|
|    KKR|       81.83995|  4080642206|
|    MMM|      183.76161|  4220433033|
|    PPG|      174.85457|  1782453221|
|    AXP|      192.61185|  4745736373|
|     CI|      335.46735|  2488919705|
|    CDW|      218.67297|  1104691716|
|    WEC|      103.12054|  1846435431|
|   BEKE|       76.19941|  7992445543|
|     PM|      101.56231|  6429937595|
|     FI|         129.74|  5031481516|
+-------+---------------+------------+
only showing top 15 rows



In [34]:
cleaned_stocks = (cleaned_stocks.withColumn("Day",func.dayofmonth(cleaned_stocks.Date))
                                 .withColumn("Month",func.month(cleaned_stocks.Date))
                                 .withColumn("Year",func.year(cleaned_stocks.Date))
                                 .withColumn("Week",func.weekofyear(cleaned_stocks.Date))
                              )

In [35]:
# Show by year, the highest and lowest price for each stocks
Byyear = cleaned_stocks.groupBy(["Company","Year"]).agg(func.max("OPen").alias("Year_High_Open"), func.min("Open").alias("Year_Low_Open"))

In [36]:
Byyear.show()

+-------+----+--------------+-------------+
|Company|Year|Year_High_Open|Year_Low_Open|
+-------+----+--------------+-------------+
|    BAC|2018|     25.729027|    20.312706|
|     PH|2018|     162.07875|    130.67992|
|     DG|2018|    107.714775|     94.84733|
|    BSX|2020|         45.96|        26.74|
|    MCK|2020|     182.81381|   116.187706|
|    MET|2020|     46.132656|    21.022701|
|   MPLX|2020|     17.714958|    5.4108133|
|    PPG|2020|     141.64787|     67.59178|
|    ADP|2021|     239.64296|    152.64398|
|    KKR|2021|      81.83995|    36.790466|
|    APO|2019|     42.198772|      19.7326|
|    ADP|2018|     134.43118|    111.71671|
|    GSK|2018|     32.493805|    29.153034|
|    BNS|2018|      42.85455|      37.9717|
|    CNC|2019|        68.045|        42.47|
|   NVDA|2020|     146.67644|     49.86448|
|    DIS|2020|        181.22|        84.49|
|   MRVL|2020|     47.198006|     16.97224|
|   ROST|2020|      118.9666|    57.565613|
|   MRNA|2020|        177.66|   

In [37]:
Bymonth = cleaned_stocks.groupBy(["Company","Year","month"]).agg(func.max("OPen").alias("Month_High_Open"), func.min("Open").alias("Month_Low_Open"))
Byweek = cleaned_stocks.groupBy(["Company","Year","Week"]).agg(func.max("OPen").alias("Week_High_Open"), func.min("Open").alias("Week_Low_Open"))

In [38]:
Byweek = Byweek.withColumn("Spread_Open",func.round(Byweek['Week_High_Open']-Byweek['Week_Low_Open'],2))

In [39]:
historic_stocks = cleaned_stocks.join(Byyear,
                                      (cleaned_stocks.Company==Byyear.Company) & (cleaned_stocks.Year==Byyear.Year),
                                     'inner').drop(Byyear.Year,Byyear.Company)

In [40]:
historic_stocks.show()

+----------+----------+----------+----------+----------+---------+---+-----+----+-------+----+--------------+-------------+
|      Date|      Open|      High|       Low|     Close|   Volume|Day|Month|Week|Company|Year|Year_High_Open|Year_Low_Open|
+----------+----------+----------+----------+----------+---------+---+-----+----+-------+----+--------------+-------------+
|2018-11-29|  25.01014| 25.161016|  24.82376| 24.885887| 46824600| 29|   11|  48|    BAC|2018|     25.729027|    20.312706|
|2018-11-29| 155.00827| 155.79898| 153.94171| 155.29329|  1018300| 29|   11|  48|     PH|2018|     162.07875|    130.67992|
|2018-11-29| 104.94107| 106.99986|104.435905| 105.69405|  3208500| 29|   11|  48|     DG|2018|    107.714775|     94.84733|
|2018-11-30| 24.797132| 25.409515| 24.735006| 25.205387| 64284600| 30|   11|  48|    BAC|2018|     25.729027|    20.312706|
|2018-11-30| 155.01747| 158.69522| 154.04285| 158.18033|  1187900| 30|   11|  48|     PH|2018|     162.07875|    130.67992|
|2018-11

In [41]:
cond = (historic_stocks.Company==Byweek.Company) & (historic_stocks.Year==Byweek.Year)

In [42]:
historic_stocks = historic_stocks.join(Byweek,cond,'inner').drop(historic_stocks.Company,Byweek.Year,Byweek.Week,Byweek.Spread_Open)

In [43]:
historic_stocks = historic_stocks.join(Bymonth,['Company','Year','Month'])

In [44]:
historic_stocks.columns

['Company',
 'Year',
 'Month',
 'Date',
 'Open',
 'High',
 'Low',
 'Close',
 'Volume',
 'Day',
 'Year_High_Open',
 'Year_Low_Open',
 'Week',
 'Week_High_Open',
 'Week_Low_Open',
 'Month_High_Open',
 'Month_Low_Open']

In [45]:
final_stocks = historic_stocks.select([
    'Company',
    'Day',
    'Month',
    'Year',
    'Week',
    'Volume',
    'Open',
    'High',
    'Low',
    'Close',
    'Month_High_Open',
    'Month_Low_Open',
    'Year_High_Open',
    'Year_Low_Open'
    ])

In [46]:
final_stocks.show()

+-------+---+-----+----+----+--------+---------+---------+----------+---------+---------------+--------------+--------------+-------------+
|Company|Day|Month|Year|Week|  Volume|     Open|     High|       Low|    Close|Month_High_Open|Month_Low_Open|Year_High_Open|Year_Low_Open|
+-------+---+-----+----+----+--------+---------+---------+----------+---------+---------------+--------------+--------------+-------------+
|    BAC| 29|   11|2018|  52|46824600| 25.01014|25.161016|  24.82376|24.885887|       25.01014|     24.797132|     25.729027|    20.312706|
|    BAC| 29|   11|2018|  50|46824600| 25.01014|25.161016|  24.82376|24.885887|       25.01014|     24.797132|     25.729027|    20.312706|
|    BAC| 29|   11|2018|  48|46824600| 25.01014|25.161016|  24.82376|24.885887|       25.01014|     24.797132|     25.729027|    20.312706|
|    BAC| 29|   11|2018|  51|46824600| 25.01014|25.161016|  24.82376|24.885887|       25.01014|     24.797132|     25.729027|    20.312706|
|    BAC| 29|   11|2

In [47]:
final_stocks.describe(['Year']).show()

+-------+------------------+
|summary|              Year|
+-------+------------------+
|  count|          30514453|
|   mean|2020.9485911151676|
| stddev|1.3901376029038048|
|    min|              2018|
|    max|              2023|
+-------+------------------+



In [48]:
final_stocks.registerTempTable('stocks')



In [49]:
spark.sql("\
Select * \
from stocks \
where Company ='MSFT' \
and Year == '2020' \
and Month == '10'").show(5)

+-------+---+-----+----+----+--------+--------+---------+---------+---------+---------------+--------------+--------------+-------------+
|Company|Day|Month|Year|Week|  Volume|    Open|     High|      Low|    Close|Month_High_Open|Month_Low_Open|Year_High_Open|Year_Low_Open|
+-------+---+-----+----+----+--------+--------+---------+---------+---------+---------------+--------------+--------------+-------------+
|   MSFT|  1|   10|2020|  45|27158400|207.3636|207.84926|205.25587|206.36316|      216.60065|     197.66025|     222.69075|    132.38866|
|   MSFT|  1|   10|2020|  14|27158400|207.3636|207.84926|205.25587|206.36316|      216.60065|     197.66025|     222.69075|    132.38866|
|   MSFT|  1|   10|2020|  46|27158400|207.3636|207.84926|205.25587|206.36316|      216.60065|     197.66025|     222.69075|    132.38866|
|   MSFT|  1|   10|2020|  15|27158400|207.3636|207.84926|205.25587|206.36316|      216.60065|     197.66025|     222.69075|    132.38866|
|   MSFT|  1|   10|2020|  33|27158

In [55]:
snap = cleaned_stocks.select(["Company","Date","Open"])

In [52]:
from pyspark.sql.window import Window as win

In [53]:
diff_day = win.partitionBy("Company").orderBy("Date")

In [56]:
snap.withColumn("Previous_Day_Open",func.lag("Open",1).over(diff_day)).show()

+-------+----------+---------+-----------------+
|Company|      Date|     Open|Previous_Day_Open|
+-------+----------+---------+-----------------+
|   AAPL|2018-11-29| 43.82976|             NULL|
|   AAPL|2018-11-30| 43.26107|         43.82976|
|   AAPL|2018-12-03| 44.26168|         43.26107|
|   AAPL|2018-12-04|43.419445|         44.26168|
|   AAPL|2018-12-06| 41.21428|        43.419445|
|   AAPL|2018-12-07|41.629402|         41.21428|
|   AAPL|2018-12-10|  39.5922|        41.629402|
|   AAPL|2018-12-11| 41.19028|          39.5922|
|   AAPL|2018-12-12|40.887936|         41.19028|
|   AAPL|2018-12-13|40.909534|        40.887936|
|   AAPL|2018-12-14|40.552002|        40.909534|
|   AAPL|2018-12-17|39.700184|        40.552002|
|   AAPL|2018-12-18|39.683372|        39.700184|
|   AAPL|2018-12-19|39.832146|        39.683372|
|   AAPL|2018-12-20|38.488407|        39.832146|
|   AAPL|2018-12-21| 37.63898|        38.488407|
|   AAPL|2018-12-24|35.548996|         37.63898|
|   AAPL|2018-12-26|

Calculate the average for each row from the past 50 days open.

In [58]:
avg_fift = win.partitionBy("Company").orderBy("Date").rowsBetween(-50,0)

In [61]:
(snap.withColumn('MA50',func.avg("Open").over(avg_fift))
    .withColumn('MA50', func.round('MA50',2))).show()

+-------+----------+---------+-----+
|Company|      Date|     Open| MA50|
+-------+----------+---------+-----+
|   AAPL|2018-11-29| 43.82976|43.83|
|   AAPL|2018-11-30| 43.26107|43.55|
|   AAPL|2018-12-03| 44.26168|43.78|
|   AAPL|2018-12-04|43.419445|43.69|
|   AAPL|2018-12-06| 41.21428| 43.2|
|   AAPL|2018-12-07|41.629402|42.94|
|   AAPL|2018-12-10|  39.5922|42.46|
|   AAPL|2018-12-11| 41.19028| 42.3|
|   AAPL|2018-12-12|40.887936|42.14|
|   AAPL|2018-12-13|40.909534|42.02|
|   AAPL|2018-12-14|40.552002|41.89|
|   AAPL|2018-12-17|39.700184| 41.7|
|   AAPL|2018-12-18|39.683372|41.55|
|   AAPL|2018-12-19|39.832146|41.43|
|   AAPL|2018-12-20|38.488407|41.23|
|   AAPL|2018-12-21| 37.63898|41.01|
|   AAPL|2018-12-24|35.548996|40.68|
|   AAPL|2018-12-26|35.584984| 40.4|
|   AAPL|2018-12-27| 37.39424|40.24|
|   AAPL|2018-12-28|37.792564|40.12|
+-------+----------+---------+-----+
only showing top 20 rows



For each stock, we ordered the open price from the highest to the lowest. We established a rank and took the top 5 to know when it happened

In [63]:
stock_max = win.partitionBy("Company").orderBy(snap.Open.desc())

In [66]:
snap.withColumn("Max_Open_Rank",func.row_number().over(stock_max)).filter("Max_Open_Rank<=5").show()

+-------+----------+---------+-------------+
|Company|      Date|     Open|Max_Open_Rank|
+-------+----------+---------+-------------+
|      A|2021-09-07|176.66948|            1|
|      A|2021-09-09|176.46252|            2|
|      A|2021-09-10|175.13217|            3|
|      A|2021-09-03|175.08292|            4|
|      A|2021-09-13|175.07306|            5|
|   AAPL|2023-08-01|195.71751|            1|
|   AAPL|2023-07-31|  195.538|            2|
|   AAPL|2023-07-27| 195.4981|            3|
|   AAPL|2023-07-20|194.57057|            4|
|   AAPL|2023-08-02|194.52069|            5|
|   ABBV|2022-04-11|163.48375|            1|
|   ABBV|2022-04-08|161.70775|            2|
|   ABBV|2023-04-25|161.38751|            3|
|   ABBV|2022-12-13|160.46742|            4|
|   ABBV|2022-12-09| 160.2848|            5|
|   ABEV|2019-07-30| 4.674537|            1|
|   ABEV|2019-07-31| 4.588291|            2|
|   ABEV|2019-08-01| 4.553793|            3|
|   ABEV|2019-08-02| 4.553793|            4|
|   ABEV|2

In [67]:
stock_result1 = final_stocks
result_Moving_Average50 = (snap.withColumn('MA50',func.avg("Open").over(avg_fift))
    .withColumn('MA50', func.round('MA50',2)))
Rank_Top5_Open = snap.withColumn("Max_Open_Rank",func.row_number().over(stock_max)).filter("Max_Open_Rank<=5")

In [71]:
(stock_result1.write.option("header",True)
              .mode("overWrite")
              .parquet('stock_df'))


In [73]:
(result_Moving_Average50.write.option("Header",True)
                              .mode("overWrite")
                              .csv('Moving_Average50'))