In [36]:
# Start Zookerp and Kafka Server (Each has to run in a seperate terminal)
#! zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
#! kafka-server-start /usr/local/etc/kafka/server.properties

[2021-02-01 16:16:14,330] INFO Reading configuration from: /usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-02-01 16:16:14,341] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-02-01 16:16:14,341] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-02-01 16:16:14,345] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-02-01 16:16:14,345] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-02-01 16:16:14,345] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-02-01 16:16:14,345] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2021-02-01 16:16:14,350] INFO Log4j 1.2 jmx support found and enabled. (org.apache.zookeeper.jmx.ManagedUt

KeyboardInterrupt: 

In [3]:

from __future__ import unicode_literals
import json
from time import sleep

from bs4 import BeautifulSoup
from kafka import KafkaConsumer, KafkaProducer
import requests

# -*- coding: utf-8 -*-
calls_class = 'calls W(100%) Pos(r) Bd(0) Pt(0) list-options'
puts_class = 'puts W(100%) Pos(r) list-options'
headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
    'Pragma': 'no-cache'
    }

        
def get_trades(class_,
               base_url = 'https://finance.yahoo.com/quote/TSLA/options?straddle=false'):
    url = base_url
    print('Accessing list')

    try:
        r = requests.get(url, headers=headers)
        if r.status_code == 200:
            html = r.text
            soup = BeautifulSoup(html, 'html.parser')
            table = soup.find('table', class_ = class_)
    except Exception as ex:
        print('Exception in get_trades')
        print(str(ex))
    finally:
        return table

def publish_message(producer_instance, topic_name, key, value):
    try:
        #print(key)
        #print(value)
        key_bytes =  bytes(key, encoding='utf-8')   #.encode('utf-8')
        value_bytes = bytes(value, encoding='utf-8') #encode('utf-8')
        
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

def connect_kafka_producer(server_address = ['localhost:9092']):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers = server_address, api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer



## Scrape Calls

In [120]:

calls = get_trades(class_ = calls_class)

kafka_producer = connect_kafka_producer()
publish_message(kafka_producer, 'TSLA_calls', 'raw', str(calls))
kafka_producer.close()

Accessing list
Message published successfully.


## Scrape Puts

In [122]:
puts = get_trades(class_ = puts_class)

kafka_producer = connect_kafka_producer()
publish_message(kafka_producer, 'TSLA_puts', 'raw', str(puts))
kafka_producer.close()

Accessing list
Message published successfully.


## Parse By Row in Table

In [4]:
def publish(soup_in, topic, producer):
    response = []
    table = soup_in.find_all('tr')[1:] # remove header from list
    for trade in table:
        contract = trade.find('a',{'class' : 'Fz(s) Ell C($linkColor)'})
        last_trade_dt = trade.find('td',{'class' : 'data-col1 Ta(end) Pstart(7px)'})
        strike = trade.find('a',{'class' : 'C($linkColor) Fz(s)'})
        last_price = trade.find('td',{'class' : 'data-col3 Ta(end) Pstart(7px)'})
        bid = trade.find('td',{'class' : 'data-col4 Ta(end) Pstart(7px)'})
        ask = trade.find('td',{'class' : 'data-col5 Ta(end) Pstart(7px)'})
        change = trade.find('td',{'class' : 'data-col6 Ta(end) Pstart(7px)'})
        pct_change = trade.find('td',{'class' : 'data-col7 Ta(end) Pstart(7px)'})
        volume = trade.find('td',{'class' : 'data-col8 Ta(end) Pstart(7px)'})
        open_interest = trade.find('td',{'class' : 'data-col9 Ta(end) Pstart(7px)'})
        implied_volatility = trade.find('td',{'class' : 'data-col10 Ta(end) Pstart(7px) Pend(6px) Bdstartc(t)'})

        parsed_trade = {
            'contract' : contract,
            'last_trade_dt' : last_trade_dt,
            'strike' : strike,
            'last_price' : last_price,
            'bid' : bid,
            'ask' : ask,
            'change' : change,
            'pct_change' : pct_change,
            'volume' : volume,
            'open_interest' : open_interest,
            'implied_volatility' : implied_volatility
        }

        for key, obs in parsed_trade.items():
            try:
                parsed_trade[key] = obs.text.strip()
            except:
                print('fail to parse observation {}', obs)
                parsed_trade[key] = ''

        publish_message(producer, topic, 'clean', json.dumps(parsed_trade))


In [132]:

kafka_producer = connect_kafka_producer()
publish(puts, 'TSLA_puts', kafka_producer)
kafka_producer.close()

Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message 

## Production Gist

This page is refreshed continuously, so the best way to track is to scrape the data continuously. We can deduplicate once the data data is in Spark (I think). 

In [138]:
ix = 0

while True:
    print('Scraping...')
    if ix == 10:
        print('Exiting')
        break
    kafka_producer = connect_kafka_producer()
    calls = get_trades(class_ = calls_class)
    publish(calls, 'TSLA_calls', kafka_producer)

    puts = get_trades(class_ = puts_class)
    publish(puts, 'TSLA_puts', kafka_producer)
    kafka_producer.close()
    ix += 1
    sleep(10)
    

ssfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Accessing list
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message published successfully.
Message publishe

In [5]:
consumer = KafkaConsumer(
    'TSLA_puts', 
    auto_offset_reset='earliest',
    bootstrap_servers=['localhost:9092'], 
    api_version=(0, 10), 
    consumer_timeout_ms=1000)

for msg in consumer:
    result = msg.value
    print (result)


31.37%", "volume": "929", "open_interest": "264", "implied_volatility": "83.51%"}'
b'{"contract": "TSLA210430P00640000", "last_trade_dt": "2021-04-23 3:59PM EDT", "strike": "640.00", "last_price": "4.95", "bid": "4.80", "ask": "5.05", "change": "-2.27", "pct_change": "-31.44%", "volume": "759", "open_interest": "1,382", "implied_volatility": "82.64%"}'
b'{"contract": "TSLA210430P00642500", "last_trade_dt": "2021-04-23 3:54PM EDT", "strike": "642.50", "last_price": "5.45", "bid": "5.00", "ask": "5.35", "change": "-1.95", "pct_change": "-26.35%", "volume": "67", "open_interest": "250", "implied_volatility": "82.12%"}'
b'{"contract": "TSLA210430P00645000", "last_trade_dt": "2021-04-23 3:57PM EDT", "strike": "645.00", "last_price": "5.34", "bid": "5.20", "ask": "5.60", "change": "-2.81", "pct_change": "-34.48%", "volume": "427", "open_interest": "823", "implied_volatility": "81.42%"}'
b'{"contract": "TSLA210430P00647500", "last_trade_dt": "2021-04-23 3:54PM EDT", "strike": "647.50", "last_