In [None]:
%%sh
pip install kafka-python psycopg2-binary elasticsearch

In [None]:
import time
import json
import random
import logging
import requests
from kafka import KafkaProducer
from kafka.errors import KafkaError

STATS_SYMBOLS = ['IBM', 'AAPL', 'MSFT', 'AMZN', 'FB', 'TSLA', 'BABA', 'TSM']
KAFKA_BROKER = "172.27.0.12:9092"
KAFKA_TOPIC = "my_test3"
API_KEY = 'Z1OYPV3VSJIAWIBV'

# logging.basicConfig(level=logging.DEBUG)
producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])

for symbol in STATS_SYMBOLS:
    r = requests.get('https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={}&outputsize=full&apikey={}'.format(symbol, API_KEY))
    data = r.json()['Time Series (Daily)']
    
    for rec in data:
        data[rec]['date'] = rec
        data[rec]['symbol'] = symbol
        data[rec]['type'] = 'daily_adjusted_series'
        
        future = producer.send(KAFKA_TOPIC, json.dumps(data[rec]).encode('utf-8'))
        
        try:
            record_metadata = future.get(timeout=10)
        except KafkaError:
            print('err')
            pass
        
        producer.flush()

In [None]:
try:
    from urllib.request import urlopen
except ImportError:
    from urllib2 import urlopen

import json
import time
import json
import random
import logging
import requests
from kafka import KafkaProducer
from kafka.errors import KafkaError

KAFKA_BROKER = "172.27.0.12:9092"
KAFKA_TOPIC = "my_test4"

producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])

def get_jsonparsed_data(url):
    response = urlopen(url)
    data = response.read().decode("utf-8")
    return json.loads(data)

url = ("https://financialmodelingprep.com/api/v3/stock_news?tickers=AAPL,FB,GOOG,AMZN&limit=50&apikey=a3ff475f3e9cd554cb4cd256735c08cd")
data = get_jsonparsed_data(url)

for rec in data:
    rec['type'] = 'stock_news'
    
    future = producer.send(KAFKA_TOPIC, json.dumps(rec).encode('utf-8'))

    try:
        record_metadata = future.get(timeout=10)
    except KafkaError:
        print('err')
        pass

    producer.flush()

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from elasticsearch import Elasticsearch
import psycopg2
import json
import os
import time
import requests

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell'

DAILY_SERIES_REKEY_MAP =  { # alpha vantage api
    '1. open': 'open',
    '2. high': 'high',
    '3. low': 'low',
    '4. close': 'close',
    '5. adjusted close': 'adjusted_close',
    '6. volume': 'volume',
    '7. dividend amount': 'dividend_amount',
    '8. split coefficient': 'split_coefficient'
}

DAILY_SERIES_PG_QUERY = """
            insert into daily_adjusted_series
            (open, high, low, close, adjusted_close, volume, dividend_amount, split_coefficient, series_date, symbol)
            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """

STOCK_NEWS_PG_QUERY = """
            insert into stock_news
            (title, image, site, news_text, url, published_date, symbol)
            values (%s, %s, %s, %s, %s, %s, %s)
            """

def telegram_bot_send(msg):
    bot_token = '1799138792:AAGzrwurIqPpJp0Vc6SJ1MuoZl8zMJ3JEWs'
    bot_chat_id = '131231613'
    send_text = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + bot_chat_id + '&parse_mode=Markdown&text=' + msg
    response = requests.get(send_text)
    return response.json()
    
class Pg:
    def __init__(self):
        self._conn = psycopg2.connect(user="stockanalytics",
                          password="stockanalytics",
                          host="172.27.0.55",
                          port="5432",
                          database="stockanalytics")
        
    def execute(self, command, args = ()):
        try:
            self._cur = self._conn.cursor()

            if args:
                self._cur.execute(command, args)
            else:
                self._cur.execute(command)

        except IndexError:
            telegram_bot_send('Error.')
            telegram_bot_send(','.join(args))
            telegram_bot_send(command)
            exit()
                
    def commit(self):
        self._cur.close()
        self._conn.commit()

class Elastic:
    def __init__(self):
        self._es = Elasticsearch(['http://172.27.0.88:9200'], http_auth=('elastic', 'T4EoCBxfGZ4Ytm3GIzVu'))
    
    def insert_index(self, doc):
        res = self._es.index(index="daily_series", body=doc)
        
    def refresh(self):
        self._es.indices.refresh(index="daily_series")

es = Elastic()
pg = Pg()

sc = SparkContext.getOrCreate()
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)
opts = {"metadata.broker.list":"172.27.0.12:9092", "auto.offset.reset": "smallest", "group.id": "test_v1"}
kafkaStream = KafkaUtils.createDirectStream(ssc, ['my_test3', 'my_test4'], opts)
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

pg.execute("delete from daily_adjusted_series")

def handler(rdd):
    def rekey(inp_dict, keys_replace):
        return {keys_replace.get(k, k): v for k, v in inp_dict.items()}
    
    if not rdd.isEmpty():
        collection = rdd.collect()
        
        for el in collection:
            if (el['type'] == 'daily_adjusted_series'):
                source = rekey(el, DAILY_SERIES_REKEY_MAP)
                
                pg.execute(DAILY_SERIES_PG_QUERY, (
                    source['open'],
                    source['high'],
                    source['low'],
                    source['close'],
                    source['adjusted_close'],
                    source['volume'],
                    source['dividend_amount'],
                    source['split_coefficient'],
                    source['date'],
                    source['symbol']
                ))

                es.insert_index(doc = {
                    'open': source['open'],
                    'high': source['high'],
                    'low': source['low'],
                    'close': source['close'],
                    'adjusted_close': source['adjusted_close'],
                    'volume': source['volume'],
                    'dividend_amount': source['dividend_amount'],
                    'split_coefficient': source['split_coefficient'],
                    'date': source['date'],
                    'symbol': source['symbol']
                })

            else:
                pg.execute(STOCK_NEWS_PG_QUERY, (
                    el['title'],
                    el['image'],
                    el['site'],
                    el['text'],
                    el['url'],
                    el['publishedDate'],
                    el['symbol']
                ))

                es.insert_index(doc = {
                    'title': el['title'],
                    'image': el['image'],
                    'site': el['site'],
                    'news_text': el['text'],
                    'url': el['url'],
                    'published_date': el['publishedDate'],
                    'symbol': el['symbol']
                })
              
        pg.commit()
        es.refresh()
        print('done')
        telegram_bot_send('Elastic and PostgreSQL were filled.')
        
parsed.foreachRDD(lambda x: handler(x))

ssc.start()
ssc.awaitTermination()

done


In [None]:
import psycopg2


def create_tables():
    """ create tables in the PostgreSQL database"""
    commands = [
        """
        drop table if exists daily_adjusted_series 
        """,
        """
        drop table if exists stock_news 
        """,
        """
        create table daily_adjusted_series (
            id serial primary key,
            open decimal(12,2),
            high decimal(12,2),
            low decimal(12,2),
            close decimal(12,2),
            adjusted_close decimal(12,2),
            volume decimal(12,2),
            dividend_amount decimal(12,2),
            split_coefficient decimal(12,2),
            series_date date,
            symbol varchar(10)
        )
        """,
        """
        create table stock_news (
            id serial primary key,
            title text,
            image text,
            site text,
            news_text text,
            url text,
            published_date date,
            symbol varchar(10)
        )
        """]
    
    conn = None
    
    try:
        # connect to the PostgreSQL server
        conn = psycopg2.connect(user="stockanalytics",
                                  password="stockanalytics",
                                  host="172.27.0.55",
                                  port="5432",
                                  database="stockanalytics")
        cur = conn.cursor()
        # create table one by one
        for command in commands:
            cur.execute(command)
        # close communication with the PostgreSQL database server
        cur.close()
        # commit the changes
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
            
create_tables()
print('done')

In [None]:
from elasticsearch import Elasticsearch
import psycopg2
import json
import os
import time
import requests

DAILY_SERIES_ELASTIC_MAPPING = {'properties': {
    'open': {'type': 'double'},
    'high': {'type': 'double'},
    'low': {'type': 'double'},
    'close': {'type': 'double'},
    'adjusted_close': {'type': 'double'},
    'volume': {'type': 'integer'},
    'dividend_amount': {'type': 'double'},
    'split_coefficient': {'type': 'double'},
    'date': {'type': 'date'},
    'symbol': {'type': 'text'}
}}

STOCK_NEWS_ELASTIC_MAPPING = {'properties': {
    'symbol': {'type': 'text'},
    'published_date': {'type': 'date'},
    'title': {'type': 'text'},
    'image': {'type': 'text'},
    'site': {'type': 'text'},
    'news_text': {'type': 'text'},
    'url': {'type': 'text'}
}}

es = Elasticsearch(['http://172.27.0.88:9200'], http_auth=('elastic', 'T4EoCBxfGZ4Ytm3GIzVu'))

es.indices.delete(index='daily_series')
es.indices.create(index='daily_series', body={'mappings': DAILY_SERIES_ELASTIC_MAPPING})

es.indices.delete(index='stock_news')
es.indices.create(index='stock_news', body={'mappings': STOCK_NEWS_ELASTIC_MAPPING})

print('done!')