In [40]:
from pyspark import SparkContext , SparkConf
from pyspark.sql import SparkSession

In [41]:
spark = SparkSession\
    .builder\
    .appName("Project_Stocks")\
    .getOrCreate()

In [42]:
#reading csv data 
stocks = spark.read.csv("StockData" , header = "True")

In [43]:
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 5 rows



In [44]:
stocks.select(["Ticker", "Date" , "Open"]).show(5)

+------+----------+--------+
|Ticker|      Date|    Open|
+------+----------+--------+
| BRK-B|05/31/2023|$321.12 |
| BRK-B|05/30/2023|$321.86 |
| BRK-B|05/26/2023|$320.44 |
| BRK-B|05/25/2023|$320.56 |
| BRK-B|05/24/2023|$322.71 |
+------+----------+--------+
only showing top 5 rows



In [45]:
stocks.filter(stocks.Ticker == "MSFT").show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|  MSFT|05/30/2023|  $331.21 |29503070|$335.23 |$335.74 |$330.52 |
|  MSFT|05/26/2023|  $332.89 |36630630|$324.02 |$333.40 |$323.88 |
|  MSFT|05/25/2023|  $325.92 |43301740|$323.24 |$326.90 |$320.00 |
|  MSFT|05/24/2023|  $313.85 |23384890|$314.73 |$316.50 |$312.61 |
|  MSFT|05/23/2023|  $315.26 |30797170|$320.03 |$322.72 |$315.25 |
|  MSFT|05/22/2023|  $321.18 |24115660|$318.60 |$322.59 |$318.01 |
|  MSFT|05/19/2023|  $318.34 |27546700|$316.74 |$318.75 |$316.37 |
|  MSFT|05/18/2023|  $318.52 |27275990|$314.53 |$319.04 |$313.72 |
|  MSFT|05/17/2023|  $314.00 |24315010|$312.29 |$314.43 |$310.74 |
|  MSFT|05/16/2023|  $311.74 |26730350|$309.83 |$313.71 |$309.83 |
|  MSFT|05/15/2023|  $309.46 |16336550|$309.10 |$309.91 |$307.

In [46]:
stocks.filter((stocks.Ticker == "MSFT") & (stocks.Date == "05/26/2023")).show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/26/2023|  $332.89 |36630630|$324.02 |$333.40 |$323.88 |
+------+----------+----------+--------+--------+--------+--------+



In [47]:
stocks.filter(((stocks.Ticker == "MSFT") | (stocks.Ticker == "V") ) & (stocks.Date == "05/31/2023")).show()

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
+------+----------+----------+--------+--------+--------+--------+



In [48]:
stocks.filter((stocks.Ticker.isin("MSFT" , "QQQ" , "SPY" , "V" ,   "TSLA")) & (stocks.Date == "05/31/2023")).show()

+------+----------+----------+---------+--------+--------+--------+
|Ticker|      Date|Close/Last|   Volume|    Open|    High|     Low|
+------+----------+----------+---------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 | 45950550|$332.29 |$335.94 |$327.33 |
|  TSLA|05/31/2023|  $203.93 |150711700|$199.78 |$203.95 |$195.12 |
|     V|05/31/2023|  $221.03 | 20460620|$219.96 |$221.53 |$216.14 |
|   SPY|05/31/2023|    417.85|110811800|  418.28|  419.22|  416.22|
|   QQQ|05/31/2023|    347.99| 65105380|  348.37|   350.6|  346.51|
+------+----------+----------+---------+--------+--------+--------+



In [49]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [50]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType


In [51]:
from datetime import datetime

In [52]:
def date_parser(date_str):
    try:
        return datetime.strptime(date_str, "%m/%d/%Y")
    except ValueError:
        return None  # Xử lý các giá trị không hợp lệ


In [53]:
date_parser_udf = udf(date_parser, DateType())


In [54]:
try:
    stocks = stocks.withColumn("ParsedDate", date_parser_udf(stocks["Date"]))
    stocks.show()
    stocks.printSchema()
except Exception as e:
    print("Error:", e)


+------+----------+----------+-------+--------+--------+--------+----------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|ParsedDate|
+------+----------+----------+-------+--------+--------+--------+----------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |2023-05-31|
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |2023-05-30|
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |2023-05-26|
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |2023-05-25|
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |2023-05-24|
| BRK-B|05/23/2023|  $323.11 |4031342|$328.19 |$329.27 |$322.97 |2023-05-23|
| BRK-B|05/22/2023|  $329.13 |2763422|$330.75 |$331.49 |$328.35 |2023-05-22|
| BRK-B|05/19/2023|  $330.39 |4323538|$331.00 |$333.94 |$329.12 |2023-05-19|
| BRK-B|05/18/2023|  $329.76 |2808329|$326.87 |$329.98 |$325.85 |2023-05-18|
| BRK-B|05/17/2023|  $327.39 |3047626|$325.02 |$328.26 |$324.82 |2023-05-17|

In [55]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col
def num_parser(value):
    if isinstance(value, str):
        return float(value.strip("$"))
    elif isinstance(value, (int ,   float)):
        return value
    else :
        return None
    
parser_number = udf(num_parser , FloatType())

stocks = (stocks
          .withColumn("Open", parser_number(col("Open")))
          .withColumn("High", parser_number(col("High")))
          .withColumn("Low", parser_number(col("Low")))
          .withColumn("Close/Last", parser_number(col("Close/Last"))))
stocks.show()


+------+----------+----------+-------+------+------+------+----------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low|ParsedDate|
+------+----------+----------+-------+------+------+------+----------+
| BRK-B|05/31/2023|    321.08|6175417|321.12|322.41|319.39|2023-05-31|
| BRK-B|05/30/2023|    322.19|3232461|321.86|322.47| 319.0|2023-05-30|
| BRK-B|05/26/2023|     320.6|3229873|320.44|322.63|319.67|2023-05-26|
| BRK-B|05/25/2023|    319.02|4251935|320.56|320.56|317.71|2023-05-25|
| BRK-B|05/24/2023|     320.2|3075393|322.71| 323.0|319.56|2023-05-24|
| BRK-B|05/23/2023|    323.11|4031342|328.19|329.27|322.97|2023-05-23|
| BRK-B|05/22/2023|    329.13|2763422|330.75|331.49|328.35|2023-05-22|
| BRK-B|05/19/2023|    330.39|4323538| 331.0|333.94|329.12|2023-05-19|
| BRK-B|05/18/2023|    329.76|2808329|326.87|329.98|325.85|2023-05-18|
| BRK-B|05/17/2023|    327.39|3047626|325.02|328.26|324.82|2023-05-17|
| BRK-B|05/16/2023|    323.75|2139996|322.46|324.69|322.36|2023-05-16|
| BRK-

In [56]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: float (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)



In [57]:
cleaned_stocks = stocks.select(["Ticker", "ParsedDate", "Volume", "Open", "Low", "High", "Close/Last"])

In [58]:
cleaned_stocks.show(5)

+------+----------+-------+------+------+------+----------+
|Ticker|ParsedDate| Volume|  Open|   Low|  High|Close/Last|
+------+----------+-------+------+------+------+----------+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|    321.08|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|    322.19|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63|     320.6|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|    319.02|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0|     320.2|
+------+----------+-------+------+------+------+----------+
only showing top 5 rows



In [59]:
cleaned_stocks.describe(["Volume", "Open", "Low","High","Close/Last"]).show()

+-------+--------------------+------------------+------------------+------------------+------------------+
|summary|              Volume|              Open|               Low|              High|        Close/Last|
+-------+--------------------+------------------+------------------+------------------+------------------+
|  count|               15108|             15108|             15108|             15108|             15108|
|   mean|5.1868408793685466E7|180.09656566181036| 177.9982781513109| 182.1253348687101| 180.1256089860054|
| stddev| 5.496484129953463E7|101.16125813324396|100.26590135955209|101.96625521621728|101.14891782168517|
|    min|           100011880|             12.07|              11.8|             12.45|             11.93|
|    max|             9999164|            479.22|            476.06|            479.98|            477.71|
+-------+--------------------+------------------+------------------+------------------+------------------+



In [60]:
#bacsic GROUP BY

In [61]:
cleaned_stocks.groupBy("Ticker").max("Open").show(15)

+------+---------+
|Ticker|max(Open)|
+------+---------+
| BRK-B|   361.39|
|  MSFT|   344.62|
|  META|   381.68|
|  TSLA|   411.47|
|  AAPL|   182.63|
|  AMZN|    187.2|
| GOOGL|   151.25|
|  NVDA|   405.95|
|   TSM|   141.61|
|     V|   250.05|
|   QQQ|   405.57|
|   SPY|   479.22|
+------+---------+



In [62]:
cleaned_stocks.groupBy("Ticker").max("Open").withColumnRenamed("max(Open)" , "MaxStockPrice").show(15)

+------+-------------+
|Ticker|MaxStockPrice|
+------+-------------+
| BRK-B|       361.39|
|  MSFT|       344.62|
|  META|       381.68|
|  TSLA|       411.47|
|  AAPL|       182.63|
|  AMZN|        187.2|
| GOOGL|       151.25|
|  NVDA|       405.95|
|   TSM|       141.61|
|     V|       250.05|
|   QQQ|       405.57|
|   SPY|       479.22|
+------+-------------+



In [63]:
from pyspark.sql.functions import udf, col, max as max_ , sum as sum_
cleaned_stocks.groupBy("Ticker").agg(max_("Open")).alias("MaxStockPrice").show(15)

+------+---------+
|Ticker|max(Open)|
+------+---------+
| BRK-B|   361.39|
|  MSFT|   344.62|
|  META|   381.68|
|  TSLA|   411.47|
|  AAPL|   182.63|
|  AMZN|    187.2|
| GOOGL|   151.25|
|  NVDA|   405.95|
|   TSM|   141.61|
|     V|   250.05|
|   QQQ|   405.57|
|   SPY|   479.22|
+------+---------+



In [64]:
from pyspark.sql.functions import max as max_ , sum as sum_
cleaned_stocks.groupBy("Ticker").agg(
    max_("Open").alias("MaxStockPrice"),
    sum_("Volume").alias("TotalVolume")
).show(15)

+------+-------------+----------------+
|Ticker|MaxStockPrice|     TotalVolume|
+------+-------------+----------------+
| BRK-B|       361.39|   5.862401321E9|
|  MSFT|       344.62| 3.7976660472E10|
|  META|       381.68| 3.0148848043E10|
|  TSLA|       411.47|1.71802975076E11|
|  AAPL|       182.63| 1.3931006136E11|
|  AMZN|        187.2| 1.0450328743E11|
| GOOGL|       151.25| 4.3956560981E10|
|  NVDA|       405.95| 5.8787218324E10|
|   TSM|       141.61| 1.2506470104E10|
|     V|       250.05| 1.0410997871E10|
|   QQQ|       405.57| 6.0437153773E10|
|   SPY|       479.22|  1.079252853E11|
+------+-------------+----------------+



In [65]:
cleaned_stocks.show()

+------+----------+-------+------+------+------+----------+
|Ticker|ParsedDate| Volume|  Open|   Low|  High|Close/Last|
+------+----------+-------+------+------+------+----------+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|    321.08|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|    322.19|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63|     320.6|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|    319.02|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0|     320.2|
| BRK-B|2023-05-23|4031342|328.19|322.97|329.27|    323.11|
| BRK-B|2023-05-22|2763422|330.75|328.35|331.49|    329.13|
| BRK-B|2023-05-19|4323538| 331.0|329.12|333.94|    330.39|
| BRK-B|2023-05-18|2808329|326.87|325.85|329.98|    329.76|
| BRK-B|2023-05-17|3047626|325.02|324.82|328.26|    327.39|
| BRK-B|2023-05-16|2139996|322.46|322.36|324.69|    323.75|
| BRK-B|2023-05-15|2191609|322.89|320.13|323.83|    323.53|
| BRK-B|2023-05-12|1938264|323.82|320.54|324.24|    322.49|
| BRK-B|2023-05-11|2549339| 321.0|319.81

In [66]:
from pyspark.sql.functions import udf, col, year, month, dayofmonth, weekofyear
from pyspark.sql.types import FloatType, DateType
cleaned_stocks = (cleaned_stocks
                  .withColumn("Year", year(col("ParsedDate")))
                  .withColumn("Month", month(col("ParsedDate")))
                  .withColumn("Day", dayofmonth(col("ParsedDate")))
                  .withColumn("Week", weekofyear(col("ParsedDate"))))

cleaned_stocks.show()

+------+----------+-------+------+------+------+----------+----+-----+---+----+
|Ticker|ParsedDate| Volume|  Open|   Low|  High|Close/Last|Year|Month|Day|Week|
+------+----------+-------+------+------+------+----------+----+-----+---+----+
| BRK-B|2023-05-31|6175417|321.12|319.39|322.41|    321.08|2023|    5| 31|  22|
| BRK-B|2023-05-30|3232461|321.86| 319.0|322.47|    322.19|2023|    5| 30|  22|
| BRK-B|2023-05-26|3229873|320.44|319.67|322.63|     320.6|2023|    5| 26|  21|
| BRK-B|2023-05-25|4251935|320.56|317.71|320.56|    319.02|2023|    5| 25|  21|
| BRK-B|2023-05-24|3075393|322.71|319.56| 323.0|     320.2|2023|    5| 24|  21|
| BRK-B|2023-05-23|4031342|328.19|322.97|329.27|    323.11|2023|    5| 23|  21|
| BRK-B|2023-05-22|2763422|330.75|328.35|331.49|    329.13|2023|    5| 22|  21|
| BRK-B|2023-05-19|4323538| 331.0|329.12|333.94|    330.39|2023|    5| 19|  20|
| BRK-B|2023-05-18|2808329|326.87|325.85|329.98|    329.76|2023|    5| 18|  20|
| BRK-B|2023-05-17|3047626|325.02|324.82

In [67]:
from pyspark.sql.functions import max as max_ , sum as sum_ , min as min_
yearly = cleaned_stocks.groupBy(["Ticker" , "Year"]).agg(max_("Open").alias("YearHigh") , min_("Open").alias("YearlyLow"))
yearly.show()

+------+----+--------+---------+
|Ticker|Year|YearHigh|YearlyLow|
+------+----+--------+---------+
| BRK-B|2023|   331.0|   294.68|
|  MSFT|2019|  159.45|    99.55|
|  MSFT|2021|  344.62|   212.17|
| BRK-B|2018|   224.0|   185.43|
|  MSFT|2020|  229.27|   137.01|
| BRK-B|2021|  300.88|   228.21|
|  MSFT|2018|  115.42|    95.14|
| BRK-B|2020|  233.92|    165.3|
|  MSFT|2023|  335.23|    223.0|
| BRK-B|2019|  227.27|   194.78|
|  MSFT|2022|  335.35|   217.55|
| BRK-B|2022|  361.39|   260.58|
|  META|2020|  300.16|   139.75|
|  META|2021|  381.68|    247.9|
|  TSLA|2019|    29.0|    12.07|
|  META|2018|  215.72|    123.1|
|  TSLA|2021|  411.47|   184.18|
|  TSLA|2018|    25.0|    17.02|
|  META|2022|  339.95|    90.08|
|  META|2019|  208.67|   128.99|
+------+----+--------+---------+
only showing top 20 rows



In [68]:
from pyspark.sql.functions import max as max_ , sum as sum_ , min as min_
weekly = cleaned_stocks.groupBy(["Ticker" , "Week"]).agg(max_("Open").alias("WeekHigh") , min_("Open").alias("WeeklyLow"))
weekly.show()

+------+----+--------+---------+
|Ticker|Week|WeekHigh|WeeklyLow|
+------+----+--------+---------+
|  MSFT|  28|  282.35|   101.15|
| BRK-B|  53|   230.5|   228.09|
|  MSFT|   9|  302.89|   111.26|
| BRK-B|   6|   323.8|   200.82|
|  MSFT|  30|  289.43|    106.3|
| BRK-B|  15|  352.98|   184.61|
|  MSFT|  20|  316.74|   123.87|
| BRK-B|  40|  282.59|   202.65|
|  MSFT|  19|  310.55|   124.29|
| BRK-B|  46|  313.67|   214.83|
|  MSFT|  18|  307.76|   127.36|
| BRK-B|  43|  292.77|   199.01|
|  MSFT|   7|  300.01|   106.14|
| BRK-B|  16|  350.86|   188.56|
|  MSFT|   2|  320.47|   101.64|
|  MSFT|  40|  296.22|   112.63|
|  MSFT|  41|  302.34|   105.35|
| BRK-B|  22|  321.86|   179.99|
|  MSFT|  49|  335.31|   105.82|
| BRK-B|  27|  279.79|    175.9|
+------+----+--------+---------+
only showing top 20 rows



In [69]:
from pyspark.sql.functions import max as max_ , sum as sum_ , min as min_
monthly = cleaned_stocks.groupBy(["Ticker" , "Month"]).agg(max_("Open").alias("MonthHigh") , min_("Open").alias("MonthlyLow"))
monthly.show()

+------+-----+---------+----------+
|Ticker|Month|MonthHigh|MonthlyLow|
+------+-----+---------+----------+
|  MSFT|    9|   304.17|    108.23|
| BRK-B|    6|    316.0|     175.9|
|  MSFT|    7|   289.43|      98.1|
|  MSFT|    2|   310.41|    102.87|
| BRK-B|    1|   322.22|    194.78|
|  MSFT|    8|   305.02|     105.4|
| BRK-B|    9|    286.7|    201.19|
|  MSFT|    3|   313.91|    109.16|
| BRK-B|    8|   305.22|    195.78|
|  MSFT|    6|    275.2|     97.38|
| BRK-B|   10|   297.98|    199.01|
|  MSFT|   10|   324.33|    103.66|
|  MSFT|    5|   335.23|     99.29|
| BRK-B|    3|   361.39|     165.3|
| BRK-B|    7|   297.42|    178.26|
| BRK-B|    5|    331.0|     168.8|
|  MSFT|   12|   343.15|     95.14|
|  MSFT|   11|   344.62|     101.8|
|  MSFT|    1|   335.35|     99.55|
| BRK-B|    2|    323.8|    200.82|
+------+-----+---------+----------+
only showing top 20 rows



In [70]:
from pyspark.sql.functions import max as max_ , sum as sum_ , min as min_
dayly = cleaned_stocks.groupBy(["Ticker" , "Day"]).agg(max_("Open").alias("DayHigh") , min_("Open").alias("DaylyLow"))
dayly.show()

+------+---+-------+--------+
|Ticker|Day|DayHigh|DaylyLow|
+------+---+-------+--------+
|  MSFT| 28| 343.15|   97.38|
|  MSFT|  9| 337.11|  101.65|
| BRK-B|  6| 341.17|  176.94|
|  MSFT| 30| 341.91|  103.66|
| BRK-B| 15|  332.7|  169.82|
|  MSFT| 20| 320.05|  101.37|
|  MSFT| 19| 342.64|   99.65|
|  MSFT| 18| 338.18|  100.01|
|  MSFT|  7| 331.64|  101.64|
| BRK-B| 16| 335.11|   174.0|
|  MSFT|  2| 330.31|    98.1|
| BRK-B| 22| 351.01|  174.86|
| BRK-B| 27|  330.6|   178.5|
| BRK-B|  1| 353.65|  176.18|
|  MSFT|  8|  337.3|  101.09|
| BRK-B|  9| 326.59|  181.79|
| BRK-B| 31|  359.0|  182.05|
| BRK-B| 18| 344.45|   174.0|
|  MSFT|  3| 335.35|   100.1|
|  MSFT| 27| 335.46|    99.3|
+------+---+-------+--------+
only showing top 20 rows



In [71]:
from pyspark.sql.functions import col

# Gán alias cho các DataFrame
weekly_alias = weekly.alias("w")
dayly_alias = dayly.alias("d")
yearly_alias = yearly.alias("y")  # Sử dụng alias cho DataFrame yearly, không phải cho hàm hay biến year
monthly_alias = monthly.alias("m") 
cleaned_stocks_alias = cleaned_stocks.alias("cs")
historic_stock_alias =historic_stock.alias("h")
historic_stock_1_alias =historic_stock_1.alias("h1") 
historic_stock_2_alias =historic_stock_2.alias("h2")


NameError: name 'historic_stock' is not defined

In [None]:
#Analysis with week data
from pyspark.sql.functions import col
historic_stock = cleaned_stocks_alias.join(
    weekly_alias,
    (col("cs.Ticker") == col("w.Ticker")) & (col("cs.Week") == col("w.Week"))
    , 'inner'
).drop(col("w.Ticker")).drop(col("w.Week"))
historic_stock.show(1)
#cleaned join yearly

In [None]:
#Analysis Stock with year data
from pyspark.sql.functions import col
historic_stock_1 = historic_stock_alias.join(
   yearly_alias,
    (col("h.Ticker") == col("y.Ticker")) & (col("h.Year") == col("y.Year"))
    , 'inner'
).drop(col("y.Ticker")).drop(col("y.Year"))
historic_stock_1.show(1)

In [None]:
# Join with monthly data
from pyspark.sql.functions import col
historic_stock_2 = historic_stock_1_alias.join(
   monthly_alias,
    (col("h1.Ticker") == col("m.Ticker")) & (col("h1.Month") == col("m.Month"))
    , 'inner'
).drop(col("m.Ticker")).drop(col("m.Month"))
historic_stock_2.show(1)

In [None]:
# join with dayly data
from pyspark.sql.functions import col
historic_stock_3 = historic_stock_2_alias.join(
   dayly_alias,
    (col("h2.Ticker") == col("d.Ticker")) & (col("h2.Day") == col("d.Day"))
    , 'inner'
).drop(col("d.Ticker")).drop(col("d.Day"))
historic_stock_3.show(1)

In [None]:
historic_stock_3.columns

In [None]:
final_stock = historic_stock_3.select(['Ticker', 'Year', 'Month', 'Day','Week', 'Volume' , 'Open' , 'Low', 'High', 'Close/Last' , 'YearHigh' , 'YearlyLow' , 'WeekHigh' ,'WeeklyLow' , 'MonthHigh' , 'MonthlyLow'])
final_stock.show()

In [None]:
# Assuming 'final_stock' is your DataFrame containing stock databases
final_stock.createOrReplaceTempView("StockDatabases")


In [None]:
# Assuming StockDatabases is your DataFrame
query_1 = spark.sql("SELECT * FROM StockDatabases WHERE Ticker = 'BRK-B' AND Year = '2023'")
query_1.show(10)


In [77]:
snapshot = cleaned_stocks.select(['Ticker' , 'ParsedDate' , 'Open'])
snapshot.show()

+------+----------+------+
|Ticker|ParsedDate|  Open|
+------+----------+------+
| BRK-B|2023-05-31|321.12|
| BRK-B|2023-05-30|321.86|
| BRK-B|2023-05-26|320.44|
| BRK-B|2023-05-25|320.56|
| BRK-B|2023-05-24|322.71|
| BRK-B|2023-05-23|328.19|
| BRK-B|2023-05-22|330.75|
| BRK-B|2023-05-19| 331.0|
| BRK-B|2023-05-18|326.87|
| BRK-B|2023-05-17|325.02|
| BRK-B|2023-05-16|322.46|
| BRK-B|2023-05-15|322.89|
| BRK-B|2023-05-12|323.82|
| BRK-B|2023-05-11| 321.0|
| BRK-B|2023-05-10|326.08|
| BRK-B|2023-05-09|324.87|
| BRK-B|2023-05-08|328.26|
| BRK-B|2023-05-05|323.36|
| BRK-B|2023-05-04|323.44|
| BRK-B|2023-05-03|327.13|
+------+----------+------+
only showing top 20 rows



In [82]:
from pyspark.sql.window import Window


In [87]:
windowSpec = Window.partitionBy("Ticker").orderBy("ParsedDate")

In [88]:
from pyspark.sql.functions import lag as lag_
snapshot.withColumn("PreviousOpen" , lag_("Open").over(windowSpec)).show()

+------+----------+-----+------------+
|Ticker|ParsedDate| Open|PreviousOpen|
+------+----------+-----+------------+
|  AAPL|2018-05-31|46.81|        null|
|  AAPL|2018-06-01| 47.0|       46.81|
|  AAPL|2018-06-04|47.91|        47.0|
|  AAPL|2018-06-05|48.27|       47.91|
|  AAPL|2018-06-06|48.41|       48.27|
|  AAPL|2018-06-07|48.54|       48.41|
|  AAPL|2018-06-08|47.79|       48.54|
|  AAPL|2018-06-11|47.84|       47.79|
|  AAPL|2018-06-12|47.85|       47.84|
|  AAPL|2018-06-13|48.11|       47.85|
|  AAPL|2018-06-14|47.89|       48.11|
|  AAPL|2018-06-15|47.51|       47.89|
|  AAPL|2018-06-18|46.97|       47.51|
|  AAPL|2018-06-19|46.29|       46.97|
|  AAPL|2018-06-20|46.59|       46.29|
|  AAPL|2018-06-21|46.81|       46.59|
|  AAPL|2018-06-22|46.53|       46.81|
|  AAPL|2018-06-25|45.85|       46.53|
|  AAPL|2018-06-26|45.75|       45.85|
|  AAPL|2018-06-27|46.31|       45.75|
+------+----------+-----+------------+
only showing top 20 rows



In [89]:
#Caculate moving average
movingAvagare = Window.partitionBy("Ticker").orderBy("ParsedDate").rowsBetween(-50 , 0 ) 

In [120]:
from pyspark.sql.functions import avg as avg_ , round as round_ 
movingAVG =(snapshot.withColumn("MA50" , avg_("Open").over(movingAvagare)).withColumn("MA50" , round_("MA50" , 2)))
movingAVG.show(3)
 

+------+----------+-----+-----+
|Ticker|ParsedDate| Open| MA50|
+------+----------+-----+-----+
|  AAPL|2018-05-31|46.81|46.81|
|  AAPL|2018-06-01| 47.0|46.91|
|  AAPL|2018-06-04|47.91|47.24|
+------+----------+-----+-----+
only showing top 3 rows



In [112]:
from pyspark.sql.functions import desc , row_number as rn
maximumStock = Window.partitionBy("Ticker").orderBy(snapshot.Open.desc())
result =  snapshot.withColumn("MaxOpen",  rn().over(maximumStock))
result.show()

+------+----------+------+-------+
|Ticker|ParsedDate|  Open|MaxOpen|
+------+----------+------+-------+
|  AAPL|2022-01-04|182.63|      1|
|  AAPL|2021-12-13|181.12|      2|
|  AAPL|2021-12-28|180.16|      3|
|  AAPL|2022-01-05|179.61|      4|
|  AAPL|2021-12-30|179.47|      5|
|  AAPL|2021-12-29|179.33|      6|
|  AAPL|2021-12-16|179.28|      7|
|  AAPL|2022-03-30|178.55|      8|
|  AAPL|2021-12-31|178.09|      9|
|  AAPL|2022-03-31|177.84|     10|
|  AAPL|2022-01-03|177.83|     11|
|  AAPL|2022-04-05| 177.5|     12|
|  AAPL|2023-05-31|177.33|     13|
|  AAPL|2021-12-27|177.09|     14|
|  AAPL|2023-05-30|176.96|     15|
|  AAPL|2022-03-29|176.69|     16|
|  AAPL|2023-05-19|176.39|     17|
|  AAPL|2022-01-12|176.12|     18|
|  AAPL|2022-02-09|176.05|     19|
|  AAPL|2021-12-23|175.85|     20|
+------+----------+------+-------+
only showing top 20 rows



In [None]:
#Saving data 


In [116]:
#Parquet format
(result.write.options(header=True)
        .partitionBy("Ticker", "ParsedDate")
        .mode("overwrite")
        .csv("result_csv"))


                                                                                

In [117]:
(result.write.options(header=True)
        .partitionBy("Ticker", "ParsedDate")
        .mode("overwrite")
        .parquet("parquet_csv"))

                                                                                

In [121]:
(movingAVG.write.options(header=True)
        .partitionBy("Ticker", "ParsedDate")
        .mode("overwrite")
        .csv("movingAVG_csv"))

                                                                                