In [None]:
import pandas as pd
import yfinance as yf
import yahoo_fin.stock_info as si
from yahoo_fin.stock_info import get_data
import plotly.graph_objects as go
from ipywidgets import interact, widgets
from datetime import timedelta,datetime
from IPython.display import display
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import col,to_date,avg,stddev,mean,lit,count,when,corr,lag,udf,last,sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType,StructField,StringType,DoubleType,TimestampType,IntegerType

In [None]:
spark = SparkSession.builder.appName("BigData").getOrCreate()

1. Exploration

We can take APPLE as a first example:

In [None]:
nas_aapl=get_data("aapl",start_date="11/30/2019",end_date="11/30/2024",index_as_date =False,interval="1d") #différents interval (1m to 3months)
nas_aapl

In [None]:
print(nas_aapl.info())

In [None]:
print(nas_aapl["close"].describe())

2. Pre-processing

First we can check how much different tickers there are in the NASDAQ stock market as we want to work on this specific market

In [None]:
nas_list=si.tickers_nasdaq()
print("Tickers in Nasdaq:",len(nas_list))
print(nas_list[0:30])
nasdaq_list=nas_list[0:30]

We put the tickers we want in a dataframe so we can access it by the name of the ticker:

In [None]:
structDay=StructType([
    StructField("date",TimestampType(),True),
    StructField("open",DoubleType(),True),
    StructField("high",DoubleType(),True),
    StructField("low",DoubleType(),True),
    StructField("close",DoubleType(),True),
    StructField("adjclose",DoubleType(),True),
    StructField("volume",DoubleType(),True),
    StructField("ticker",StringType(),True)
])

structMin=StructType([
    StructField("date",TimestampType(),True),
    StructField("open",DoubleType(),True),
    StructField("high",DoubleType(),True),
    StructField("low",DoubleType(),True),
    StructField("close",DoubleType(),True),
    StructField("volume",DoubleType(),True),
    StructField("ticker",StringType(),True)
])

dfday=spark.createDataFrame([],structDay)
dfmin=spark.createDataFrame([],structMin)
dateToday=datetime.today().strftime("%Y-%m-%d")
date7days=(datetime.today()-timedelta(days=7)).strftime("%Y-%m-%d")
valid_nasdaq_list=[]

for ticker in nasdaq_list:
    try:
        data_tickers_min=get_data(ticker,start_date=date7days,index_as_date=True,interval="1m")
        data_tickers_d= get_data(ticker,start_date="11/30/2014",index_as_date=True,interval="1d")
        data_tickers_min["ticker"] = data_tickers_min["ticker"].astype("string")
        data_tickers_d["ticker"] = data_tickers_d["ticker"].astype("string")
        if((len(data_tickers_d))and(len(data_tickers_min))): #we put this treshold to remove tickers with small amount of data
            data_tickers_min=spark.createDataFrame(data_tickers_min.reset_index())
            data_tickers_d=spark.createDataFrame(data_tickers_d.reset_index())
            dfmin=dfmin.union(data_tickers_min)
            dfday=dfday.union(data_tickers_d)
            valid_nasdaq_list.append(ticker)
        else:
            print(f"{ticker} removed")
    except Exception as e:
        print(f"{ticker} is not available now : {e}")
def dataEng(data):
    df=data
    df=df.withColumn("date", to_date(col("date"))) #To put the right date type
    df=df.withColumn("variation",col("high")-col("low")) #Variation between the highest value of the day and the lowest
    df=df.na.drop()
    return df

df_day=dataEng(dfday)
df_min=dataEng(dfmin)

windowReturn=Window.partitionBy("ticker").orderBy("date")
window50=Window.partitionBy("ticker").orderBy("date").rowsBetween(-49,0)
window200=Window.partitionBy("ticker").orderBy("date").rowsBetween(-199,0)
df_day=df_day.withColumn("return",(col("close")-lag("close",1).over(windowReturn))/lag("close",1).over(windowReturn))
df_day=df_day.withColumn("SMA50",avg(col("close")).over(window50)) #SMA (Simple Moving Average) for 50 days
df_day=df_day.withColumn("SMA200",avg(col("close")).over(window200)) #for 200 days

In [None]:
df_day.show(5)

We calculate the sharp return ratio and explain the meaning of it

In [None]:
risk_free=0.02/252 #2%/per year cause there are 252 days of open stock market per year
sharpReturnData=[]

for ticker in valid_nasdaq_list:
    tick=yf.Ticker(ticker)
    info=tick.info

    peRatio=info.get("trailingPE")
    betaRatio=info.get("beta")
    revenueGrowth=info.get("revenueGrowth")
    dailyVolume=info.get("volume")
    averageVolume=info.get("averageVolume")

    dfreturn=df_day.filter(col("ticker")==ticker)
    returnR=dfreturn.agg(avg("return")).collect()[0][0] #average of the return
    vola=dfreturn.agg(stddev("return")).collect()[0][0] #standard deviation = "écart type"

    latestClose=dfreturn.select(last("close",ignorenulls=True).alias("latestClose")).collect()[0]["latestClose"]
    sma50=dfreturn.select(last("SMA50",ignorenulls=True).alias("latestSMA50")).collect()[0]["latestSMA50"] if dfreturn.filter(col("SMA50").isNotNull()).limit(1).count()==1 else None
    #We check if there is at least a line with a non-NULL value and if so it takes the last value of it in the column
    sma200=dfreturn.select(last("SMA200",ignorenulls=True).alias("latestSMA200")).collect()[0]["latestSMA200"] if dfreturn.filter(col("SMA200").isNotNull()).limit(1).count()==1 else None

    sharpReturn=((returnR-risk_free)/vola) if vola else None

    sharpReturnData.append({
        "ticker":ticker,
        "latestClose":latestClose,
        "SMA50":sma50,
        "SMA200":sma200,
        "sharpReturn":sharpReturn,
        "peRatio":peRatio,
        "betaRatio":betaRatio,
        "vola":vola,
        "revenueGrowth":revenueGrowth,
        "dailyVolume":dailyVolume,
        "averageVolume":averageVolume,
    })

sharpReturnDf=spark.createDataFrame(sharpReturnData)
sharpReturnDf.show()

In [None]:
def sharpRatioLabel(ratio):
    if ratio<0:
        return "Bad"
    if ((ratio>0)and(ratio<1)):
        return "Not so bad"
    if ((ratio>=1)and(ratio<2)):
        return "Good"
    if (ratio >=2):
        return "Amazing"

sharpRatioLabelF=udf(sharpRatioLabel,StringType())
sharpReturnDf=sharpReturnDf.withColumn("sharpRatioMeaning",sharpRatioLabelF(col("sharpReturn")))

sharpReturnDf = sharpReturnDf.withColumn("longTermScore",(
    when((col("peRatio").isNotNull())&(col("peRatio")<20),3).otherwise(0)+ #PE ratio is how much investor pays to get a $ of benefice
    #PE ratio is 1.5 more important than the revenue growth and the beta ratio -> PE ratio <20 -> company under-evaluated
    when((col("revenueGrowth").isNotNull())&(col("revenueGrowth")>0.1),2).otherwise(0)+
    #ratio of revenue growth is how much % the revenues of the company grew -> 0.1=10% 
    when((col("betaRatio").isNotNull())&(col("betaRatio")<1),2).otherwise(0)+
    #betaratio is the volability of comparated to the global market -> if < 1 then it's less volatible than the global market
    when((col("averageVolume").isNotNull())&(col("averageVolume")>1000000),1).otherwise(0)+
    #We count the average volume of transaction as a criteria for long term investments -> meaning it's pretty active
    when((col("latestClose").isNotNull())&(col("SMA50").isNotNull())&(col("SMA200").isNotNull())&(col("latestClose")>col("SMA200"))&(col("SMA50")>col("SMA200")),2).otherwise(0))
    #checking if the actual price is higher than the moving average on 200 days, meaning it's actually going up, and checking if the
    #moving average on 50 days is higher than the moving average on 200 days, meaning it tends to price up
)

sharpReturnDf = sharpReturnDf.withColumn("shortTermScore",(
    when((col("sharpReturn").isNotNull())&(col("sharpReturn")>1),3).otherwise(0)+
    #return adjusted to the risk -> we use it to see if the return is worth the risk ->> if it's >1 then the return is worth the risk
    when((col("betaRatio").isNotNull())&(col("betaRatio")>1),2).otherwise(0)+
    #betaratio >1 so more volatible than the global market
    when((col("vola").isNotNull())&(col("vola")>0.02),2).otherwise(0)+
    #high volability -> more likely to be good a short term investment -> volability is the "écart type" of the return (indicates if it's stable)
    when((col("dailyVolume").isNotNull())&(col("dailyVolume")>col("averageVolume")),2).otherwise(0)+
    #if there is an un-normal recent activity then it's more likely to be a good short term investment
    when((col("latestClose").isNotNull())&(col("SMA50").isNotNull())&(col("latestClose")>col("SMA50")),1).otherwise(0))
    #latest close value > MA 50 days -> recent price up and activity
)

sharpReturnDf=sharpReturnDf.orderBy(col("longTermScore").desc(),col("sharpReturn").desc())
sharpReturnDf.show()

In [None]:
for ticker in valid_nasdaq_list:
    count=df_min.filter(col("ticker")==ticker).count()
    print(f"{ticker} : {count}")

In [None]:
print(df_min.isna().sum())

3. Analysis and visualizations

Interface to help you chose a company in fonction of the desired term time:

In [None]:
def recommandations(termTime):
    if termTime=="Long Term":
        sortDF=sharpReturnDf.orderBy(["longTermScore","sharpReturn"],ascending=[False,False])
        title="Best companies to invest in for long-term investment:"
        print(f"\n{title}\n")
        sortDF.select("ticker","longTermScore").show(truncate=False)
    else:
        sortDF=sharpReturnDf.orderBy(["shortTermScore","sharpReturn"],ascending=[False,False])
        title="Best companies to invest in for short-term investment:"
        print(f"\n{title}\n")
        sortDF.select("ticker","shortTermScore").show(truncate=False)

termTime=widgets.Dropdown(
    options=["Long Term","Short Term"],
    value="Long Term",
    description="Term Time: "
)

button = widgets.Button(description="Display")
def click(button):
    recommandations(termTime.value)
button.on_click(click)
display(termTime, button)

Interface to show the variation in stock value of a company:

In [None]:
def filter_data_by_period(ticker,periode):
    dateToday2=datetime.today()

    if periode=="1 Day":
        yesterday=dateToday2-timedelta(days=1)
        start_date=yesterday.replace(hour=0,minute=0,second=0,microsecond=0)
    elif periode=="1 Week":
        start_date=dateToday2-timedelta(weeks=1)
    elif periode=="1 Month":
        start_date=dateToday2-timedelta(weeks=4)
    elif periode=="6 Months":
        start_date=dateToday2-timedelta(weeks=26)
    elif periode=="1 Year":
        start_date=dateToday2-timedelta(weeks=52)
    elif periode=="5 Years":
        start_date=dateToday2-timedelta(weeks=260)

    if periode in ["1 Day","1 Week"]:
        filtered=df_min.filter((df_min["ticker"]==ticker)&(df_min["date"]>=start_date))
    else:
        filtered=df_day.filter((df_day["ticker"]==ticker)&(df_day["date"]>=start_date))
    
    filteredPandas=filtered.toPandas()
    filteredPandas["date"]=pd.to_datetime(filteredPandas["date"],errors="coerce")
    filteredPandas=filteredPandas.sort_values(by="date")
    return filteredPandas

def plot_ticker_with_period(ticker,periode):
    sub=filter_data_by_period(ticker,periode)

    if not sub.empty:
        firstClose=sub["close"].iloc[0]
        lastClose=sub["close"].iloc[-1]
        var=((lastClose-firstClose)/firstClose)*100
    else:
        var=0

    if var>0:
        varClose=f"+{var:.2f}%"
    else:
        varClose=f"{var:.2f}%"

    if(periode=="1 Day"):
        sub.loc[sub["date"].diff()>timedelta(hours=12),"close"]=None
        sub["heure"]=sub["date"].dt.strftime("%d %H:%M")
        sub=sub.sort_values(by="date")
        x_label=sub["heure"]
    elif(periode=="1 Week"):
        sub.loc[sub["date"].diff()>timedelta(hours=12),"close"]=None
        sub=sub.sort_values(by="date")
        sub["day"]=sub["date"].dt.strftime("%d %H:%M")
        x_label=sub["day"]
    else:
        sub=sub.sort_values(by="date")
        x_label=sub["date"]
    
    fig=go.Figure()
    fig.add_trace(go.Scatter(
        x=x_label,
        y=sub["close"],
        mode="lines",
        name=f"Close value ({ticker})",
        line=dict(color="blue",width=2),
        connectgaps=False
    ))
    if(periode=="1 Day"):
        titlex="Hour"
        ntickss=24
    elif((periode=="1 Week")):
        titlex="Date"
        ntickss=7
    else:
        titlex="Date"

    if((periode=="1 Day")or(periode=="1 Week")):
        xaxiss=dict(title=titlex,type="category",nticks=ntickss,showgrid=True)
    else:
        xaxiss=dict(title=titlex,showgrid=True)

    fig.update_layout(
        title=f"Close values for {ticker} ({periode}) , {varClose}",
        xaxis=xaxiss,
        yaxis_title="Close value (in $)",
        template="plotly_white"
    )
    
    fig.show()

tickers=valid_nasdaq_list
periode=["1 Day","1 Week","1 Month","6 Months","1 Year","5 Years"]

interact(
    plot_ticker_with_period,
    ticker=widgets.Dropdown(options=tickers,description="Select Ticker: "),
    periode=widgets.Dropdown(options=periode,description="Select Period: ")
)