In [None]:
%pip install --upgrade findspark
%pip install --upgrade virtualenv
%pip install --upgrade dateparser
%pip install --upgrade pyspark

In [35]:
import pandas as pd
import yfinance as yf
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from bs4 import BeautifulSoup as bs
import requests
#from pyspark.sql.functions import sum,max,min,mean,count
import datetime as dt
import findspark

import yaml
from yaml.loader import SafeLoader
from os.path import abspath
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
warehouse_location = abspath('spark-warehouse')
with open('cfg.yml') as f:
    config = yaml.load(f, Loader = SafeLoader)
#creating the spark connection

findspark.init()
#spark = SparkSession.builder \
#    .master(config['spark']['spark_master'])\
#   .appName('gather')\
#   .enableHiveSupport()\
#    .config('spark.sql.warehouse.dir', warehouse_location)\
#    .config(config['spark']['spark_jars'], config['spark']['spark_jars_path'])\
#    .config('spark.cores.max', '2')\
 #   .config('spark.executor.cores', '2')\
#    .getOrCreate()




import pyspark
from pyspark.sql import SparkSession


warehouseLocation = "file:${system:user.dir}/spark-warehouse"
spark = SparkSession\
   .builder\
   .appName("SparkSessionZipsExample")\
   .config("spark.sql.warehouse.dir", warehouseLocation)\
   .enableHiveSupport()\
   .getOrCreate()


spark = SparkSession \
    .builder \
    .master(config['spark']['spark_master'])\
    .appName("gather") \
    .config("spark.executer.memory", "2g") \
    .config("spark.worker.cleanup.enabled", "true") \
    .config("spark.cleaner.periodicGC.interval", "10min") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark 





RuntimeError: Java gateway process exited before sending its port number

In [None]:
# config details for the postgres db
url = config['postgres']['url']
props = {
    'user': config['postgres']['user'],
    'password' : config['postgres']['user'],
    'url': url,
    'driver': config['postgres']['driver']
    
}

In [None]:
#retrieve headlines from financial post
def gather_headlines(company_name, ticker):
    headlines = []
    dates = []
    for i in range(10, 20000, 10):    # Running for-loop
        info_url = "https://financialpost.com/search/?search_text="+company_name +"&date_range=-3650d&sort=desc&from="+str(i)
        page = requests.get(info_url)
        parser = bs(page.content, "html.parser" )
        date = parser.body.find_all('div', attrs={'class': 'article-card__meta-bottom'})
        for span in date:
            dates.append(span.text.split("   ")[1])
        headline = parser.body.find_all('h3', class_ = 'article-card__headline text-size--extra-large--sm-up')
        for x in headline:
            headlines.append(x.text)
            #print(x.text)
    dates = dates[:len(headlines)]
    file = {'date' : dates, "headline" : headlines}
    file = pd.DataFrame(file)
    file['ticker'] = ticker
    return file

#calculate sentiment scores for each headlines and append to dataset
def analyze_sent(df):
    analyze_obj = SentimentIntensityAnalyzer()
    df['sentiment']=df['headline'].apply(lambda headline: analyze_obj.polarity_scores(str(headline))['compound'])
    print(df)
    df.fillna(0, inplace = True)
    return df

def final_sentiment(df):
    return df.withColumn("sent_score", df.mean_sentiment*(df.headline_count**2)).drop('headline', 'headline_count', 'mean_sentiment')

In [None]:
import dateparser

ticker_list = ['MSFT','GOOG']
company_list = ['microsoft', 'google']
def process_headlines(ticker_list, comapny_list):
    dfs = []
    for tick, company in zip(ticker_list, company_list):
        data = gather_headlines(company, tick)
        dfs.append(data)
    full_df = pd.concat(dfs)
    dates = []
    for index, row in full_df.iterrows():
        date = dateparser.parse(row['date'], date_formats = ["%d-%m-%y"])
        dates.append(date.date())
    full_df['date'] = dates

    full_df = analyze_sent(full_df)
    full_df = spark.createDataFrame(full_df.reset_index())
    aggregated = full_df.groupBy('date', 'ticker').agg(count('headline').alias('headline_count'), mean('sentiment').alias("mean_sentiment"))
    final_news = final_sentiment(aggregated) 

    # writing spark df to postgres db 
    final_news.write.format("jdbc")\
        .option("url", "jdbc:postgresql://localhost:5432/financials") \
        .option("driver", "org.postgresql.Driver").option("dbtable", "sentiment") \
        .option("user", "adam").option("password", "green").mode('overwrite').save()
process_headlines(ticker_list, company_list)

In [None]:
def get_financials(ticker, start):
    time_delt = dt.timedelta(days = 150)
    start_day = start - time_delt
    data = yf.download(str(ticker), start_day)
    data['ticker'] = ticker
    data = data.rename(columns = {'Date':'date', 'Open':'open', 'High':'high', 'Low':'low', 'Close':'close', 'Adj Close': 'adj_close', 'Volume':'volume'})
    data = data.reset_index()
    print('success!')
    return data
                       
                       
def EWMA(data, ndays): 
    EMA = pd.Series(data['Close'].ewm(span = ndays, min_periods = ndays - 1).mean(), 
                 name = 'EWMA_' + str(ndays)) 
    data = data.join(EMA) 
    return data

def rsi(close, periods = 14):
    
    close_delta = close.diff()

    # Make two series: one for lower closes and one for higher closes
    up = close_delta.clip(lower=0)
    down = -1 * close_delta.clip(upper=0)
    
    ma_up = up.ewm(com = periods - 1, adjust=True, min_periods = periods).mean()
    ma_down = down.ewm(com = periods - 1, adjust=True, min_periods = periods).mean()

    rsi = ma_up / ma_down
    rsi = 100 - (100/(1 + rsi))
    return rsi

def BBANDS(data, window):
    MA = data.close.rolling(window).mean()
    SD = data.close.rolling(window).std()
    data['MiddleBand'] = MA
    data['UpperBand'] = MA + (2 * SD) 
    data['LowerBand'] = MA - (2 * SD)
    return data

def prep_financials(df):
    df = pd.DataFrame(df)
    df.set_index('date')
    df['target'] = (df['close'].shift(-1))
    df['10mda'] = df['close'].rolling(10).mean()
    df['20mda'] = df['close'].rolling(20).mean()
    df['50mda'] = df['close'].rolling(50).mean()
    df['100mda'] = df['close'].rolling(100).mean()
    df = EWMA(df, 20)
    df = EWMA(df, 50) 
    df = EWMA(df, 100)
    df['rsi'] = rsi(df['close'])
    df = BBANDS(df, 40)
    df.dropna(inplace = True)
    df.reset_index()
    return df

In [None]:
def process_finance(ticker_list):
    finance_dfs = []
    for tick in ticker_list:
        data = get_financials(tick,dt.date(2018,1, 1))
        data = prep_financials(data)
        finance_dfs.append(data)
    final_finance = pd.concat(finance_dfs)
    final_finance = spark.createDataFrame()
    final_news.write.format("jdbc")\
        .option("url", "jdbc:postgresql://localhost:5432/financials") \
        .option("driver", "org.postgresql.Driver").option("dbtable", "company_data") \
        .option("user", "adam").option("password", "green").mode('append').save()
process_finance