## Schedular to download file from ftp server and save data to mongdb database

In [1]:
### Import statements

In [None]:
import http.client as http 
import json
import pandas as pd
import datetime
import schedule # use to make schedule of getting data 
import time
import pymongo
from pymongo import MongoClient
import pandas_market_calendars as mcal
from ftplib import FTP
from pandas_datareader.data import DataReader as dr
import os
from zipfile import ZipFile
import zipfile
import 
pd.set_option("display.max_rows", 1000)
pd.set_option("display.max_columns", 1000)re
import csv

### config database

In [None]:
client = MongoClient('localhost', 27017)
# define database name
db = client.investment
DB = client.HistoricalData
collectionTreasury = 'TreasuryData'

In [None]:
# get nyse calender

In [None]:
nyse = mcal.get_calendar('NYSE')

In [None]:
start_date = datetime.datetime.today().strftime('%Y-%m-%d')
end_date   = datetime.datetime.today().strftime('%Y-%m-%d')

early = nyse.schedule(start_date=start_date, end_date=end_date)
business_days = mcal.date_range(early, frequency='1D')


# ##get treasury data from database

In [None]:
## download daily treasury data
def get_treasury(db):
    syms = ['DGS6MO']
    yc = dr(syms, 'fred') # could specify start date with start param here
    names = dict(zip(syms, ['6m']))
    yc = yc.rename(columns=names)
    yc = yc[['6m']]

    pp = pd.DataFrame(yc).reset_index()
    data = pp.iloc[-1]
    Interest_rate = data['6m']
    dd = pd.DataFrame({'Interest_rate':[data['6m']],'Date':[data['DATE']]})
    Data = dd.to_dict('records')
    result = db.TreasuryData.insert_many(Data, ordered=True)
    print('treasury data uploaded')
    return (Interest_rate)

In [None]:
### connnect with ftp server

In [None]:

# make a connection to ftp server
ftp = FTP('L3.deltaneutral.com')
ftp.login(user='UserName', passwd = 'UserPassword')

### grab zip file

In [None]:
# get zip file list
filenames = ftp.nlst()
file = filenames[-1]

# download zip file
def grabFile(file):
    filename = file
    print(filename)
    localfile = open(str(filename), 'wb')
    ftp.retrbinary('RETR ' + filename, localfile.write, 1024)
    ftp.quit()
    localfile.close()
    return (filename)


### label the data

In [None]:
# Expiration labelling
def expiration_group_def(data):
    data['date_diff'] = (data['expiration'] - data['quote_date']).astype('timedelta64[D]')

    # bins Creations for expiration_group
    # print('Labeling expiration_group ... ')
    bins_duration = [-float("inf"), 7, 14, 30, 60, 90, 180, 365, 547, 730, float("inf")] # Upto the number of days
    labels_duration = ['0_week','1_week', '2_week', '1_month', '2_month', '3_month', '6_month', '1_year', '1.5_year', '2_year']
    cat_duration = pd.cut(data['date_diff'], bins=bins_duration, labels=labels_duration)
    Cat_duration = cat_duration.to_frame(name='expiration_group')
    Duration_encoded = pd.concat([data, Cat_duration], axis=1)
    return Duration_encoded

def strike_group_def(data):
    # Strike Encoding
    data['Strike_diff'] = (1 - data['strike'] / data['underlying_bid_1545'])

    # bins Creations for Strike_group
    # print('Labeling Strike_group ...')
    bins_strike =[-float("inf"),-.70,-.60,-.50, -.40, -.30, -.20, -.10, 0, .10, .20, .30, .40, .50, .60, .70, 0.80, float("inf")]
    labels_strike = ['minus_other','minus_70','minus_60','minus_50', 'minus_40', 'minus_30', 'minus_20', 'minus_10', 'mid_point', 'plus_10', 'plus_20','plus_30', 'plus_40', 'plus_50', 'plus_60', 'plus_70','plus_other']
    cat_strike = pd.cut(data['Strike_diff'], bins=bins_strike, labels=labels_strike)
    Cat_strike = cat_strike.to_frame(name='Strike_group')
    strike_encoded = pd.concat([data, Cat_strike], axis=1)
    return strike_encoded

## interest rate joiner

In [None]:

def interest_rate_joiner(data, db, collectionTreasury):
    startdate = data['quote_date'].min()
    enddate = data['quote_date'].max()
    query = {
        'Date': {
            '$gte': startdate,
            '$lte': enddate
        }
    }

    project = {
        '_id':0,
        'Treasury': 1,
        'Date': 1,
    }
    interest_rates = pd.DataFrame(list(db[collectionTreasury].find(query, project)))
    if interest_rates.shape[0] != 0 :
        interest_rates.dropna(subset=['Date'], inplace=True)
        interest_rates = interest_rates.rename(columns={'Treasury': 'Interest_rate'})
        interest_rates['Date'] = interest_rates['Date'].dt.tz_localize('UTC')   
        interest_concat_data = data.merge(interest_rates, left_on='quote_date', right_on='Date', how='left')
    else:
        print('quote_date',data['quote_date'].iloc[0])
        Interest_rate = get_treasury(db)
        interest_rates = pd.DataFrame({'Interest_rate':[Interest_rate],'Date':[data['quote_date'].iloc[0]]})
#         interest_rates['Date'] = interest_rates['Date'].dt.tz_localize('UTC')
        print(interest_rates.head())
        interest_concat_data = data.merge(interest_rates, left_on='quote_date', right_on='Date', how='left')
    return interest_concat_data


### prerocess data

In [None]:
def process(data):
    columns={'Gamma':'gamma_1545','Ask':'ask_1545','UnderlyingPrice':'underlying_bid_1545','Vega':'vega_1545',
        'Theta':'theta_1545','OptionSymbol':'optionroot','Expiration':'expiration','UnderlyingSymbol':'symbol',
        'Last':'last','Volume':'volume','Delta':'delta_1545','Strike':'strike','OpenInterest':'openinterest',
        ' DataDate':'quote_date','IVMean':'implied_volatility_1545','Bid':'bid_1545','Type':'option'}
    data.rename(columns=columns, inplace=True)
    data.dropna(subset=['quote_date', 'expiration'], inplace=True)
    data = data.drop(['Flags','T1OpenInterest','IVBid','IVAsk','AKA'], axis=1)
    data['option'] = data['option'].map({'put': 'p', 'call': 'c'})
    data = data.loc[data['option'] == 'p']
    print(data.shape)
    data['quote_date'] = pd.to_datetime(data['quote_date'], format="%m/%d/%Y").dt.tz_localize('UTC')
    data['expiration'] = pd.to_datetime(data['expiration'], format="%m/%d/%Y").dt.tz_localize('UTC')
    data = expiration_group_def(data)
    data = strike_group_def(data)
    data = interest_rate_joiner(data, db, collectionTreasury)
    
    return(data)

### upload data to database useing chunk upload

In [None]:
def upload_options_data(db,DB,path):
    print("Data uploading")
    skip = 0
    select = 10000
    rows_added = 0
    max_rows = csv_length(path)
    print(" count : "+str(max_rows))

    if skip == 0:
        data = pd.read_csv(path, nrows=select)       
        data = process(data)
        Data = data.to_dict('records')
        try:
            result = DB.OptionsDataTEMP.insert_many(Data, ordered=True)
            skip = skip + select
            rows_added = rows_added + len(data['quote_date'])
            print("Added "+str(rows_added)+" rows.")    
            
        except pymongo.errors.BulkWriteError as e:
            print(e.details['writeErrors'])       
        
    while skip < max_rows:
        print('Skip: '+str(skip)+', rows_added: '+str(rows_added))
        data = pd.read_csv(path, nrows=select, skiprows=range(1,skip))        
        data = process(data)
        Data = data.to_dict('records')
        try:
            result = DB.OptionsDataTEMP.insert_many(Data, ordered=True)
            skip = skip + select
            rows_added = rows_added + len(data['quote_date'])
            print("Added "+str(rows_added)+" rows.")
            
        except pymongo.errors.BulkWriteError as e:
            print(e.details['writeErrors'])      
    print('option data upload complete')
    return ()
# L3_optionstats_20180807.csv

def csv_length(path):
    data = pd.read_csv(path)
    return data.shape[0]

def pricing_data(data):
    data.rename(columns={'quotedate':'Date'}, inplace=True)
    data['Date'] = pd.to_datetime(data['Date'], format="%m/%d/%Y").dt.tz_localize('UTC')
    data.dropna(inplace=True)
    return(data)

def upload_price_data(db,DB,path):
    print("Data uploading")
    skip = 0
    select = 10000
    rows_added = 0
    max_rows = csv_length(path)
    print(" count : "+str(max_rows))

    if skip == 0:
        data = pd.read_csv(path, nrows=select)
        data = pricing_data(data)
        Data = data.to_dict('records')
        try:
            DB.PricingDataTEMP.insert_many(Data, ordered=True)
            skip = skip + select
            rows_added = rows_added + len(data['Date'])
            print("Added "+str(rows_added)+" rows.")
            
        except pymongo.errors.BulkWriteError as e:
            print(e.details['writeErrors'])  
    
    while skip < max_rows:
        print('Skip: '+str(skip)+', rows_added: '+str(rows_added))
        data = pd.read_csv(path, nrows=select, skiprows=range(1,skip))        
        data = pricing_data(data)
        Data = data.to_dict('records')        
        try:
            DB.PricingDataTEMP.insert_many(Data, ordered=True)
            skip = skip + select
            rows_added = rows_added + len(data['Date'])
            print("Added "+str(rows_added)+" rows.")
        
        except pymongo.errors.BulkWriteError as e:
            print(e.details['writeErrors'])
        print('pricing data upload complete')
    return

### Schedule auto job tofix time

In [None]:
def job():
    today = datetime.datetime.today().strftime('%Y-%m-%d')

    if today in business_days:
        # Interest_rate = get_treasury(db)
        filename = grabFile(file)
        print('filename:',filename)
        # open zip file
        fh = open(''+filename+'', 'rb')
        z = zipfile.ZipFile(fh)
        # unzip file and extract in folder
        for name in z.namelist():
            outpath = "./daily_file"            
            z.extract(name, outpath)
            if re.match("(.*)L3_options_(.*)", outpath+'/'+name):
                upload_options_data(db,DB,path=outpath+'/'+name)
                os.remove(outpath+'/'+name)
                print('options data uploaded')
            elif re.match("(.*)L3_stockquotes_(.*)",outpath+name):
                upload_price_data(db,DB,path=outpath+'/'+name)
                print('pricing data uploaded')
                os.remove(outpath+'/'+name)
        fh.close()
    else:
        print('today is not business day')

schedule.every().day.at("17:28").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)


In [None]:

######################################################
######################################################
"""
class ftp_daily_optionsdata:
    def __init__(self,db = db,DB = DB,collectionTreasury = collectionTreasury):
        self.db = db
        self.DB = DB
        self.collectionTreasury = collectionTreasury

    ## download daily treasury data
    def get_treasury(self,db):
        syms = ['DGS6MO']
        yc = dr(syms, 'fred') # could specify start date with start param here
        names = dict(zip(syms, ['6m']))
        yc = yc.rename(columns=names)
        yc = yc[['6m']]

        pp = pd.DataFrame(yc).reset_index()
        data = pp.iloc[-1]
        Interest_rate = data['6m']
        dd = pd.DataFrame({'Interest_rate':[data['6m']],'Date':[data['DATE']]})
        Data = dd.to_dict('records')
        result = db.TreasuryData.insert_many(Data, ordered=True)
        print('treasury data uploaded')
        return (Interest_rate)

    # make a connection to ftp server
    ftp = FTP('L3.deltaneutral.com')
    ftp.login(user='to.green.michael', passwd = 'Pazzword24$')

    # get zip file list
    filenames = ftp.nlst()
    file = filenames[-2]

    # download zip file
    def grabFile(self , file):
        filename = file
        print(filename)

        localfile = open(str(filename), 'wb')
        ftp.retrbinary('RETR ' + filename, localfile.write, 1024)

        ftp.quit()
        localfile.close()
        return 

    # Expiration labelling
    def expiration_group_def(self,data):
        data['date_diff'] = (data['expiration'] - data['quote_date']).astype('timedelta64[D]')

        # bins Creations for expiration_group
        # print('Labeling expiration_group ... ')
        bins_duration = [-float("inf"), 7, 14, 30, 60, 90, 180, 365, 547, 730, float("inf")] # Upto the number of days
        labels_duration = ['0_week','1_week', '2_week', '1_month', '2_month', '3_month', '6_month', '1_year', '1.5_year', '2_year']
        cat_duration = pd.cut(data['date_diff'], bins=bins_duration, labels=labels_duration)
        Cat_duration = cat_duration.to_frame(name='expiration_group')
        Duration_encoded = pd.concat([data, Cat_duration], axis=1)
        return Duration_encoded

    def strike_group_def(self , data):
        # Strike Encoding
        data['Strike_diff'] = (1 - data['strike'] / data['underlying_bid_1545'])

        # bins Creations for Strike_group
        # print('Labeling Strike_group ...')
        bins_strike =[-float("inf"),-.70,-.60,-.50, -.40, -.30, -.20, -.10, 0, .10, .20, .30, .40, .50, .60, .70, 0.80, float("inf")]
        labels_strike = ['minus_other','minus_70','minus_60','minus_50', 'minus_40', 'minus_30', 'minus_20', 'minus_10', 'mid_point', 'plus_10', 'plus_20','plus_30', 'plus_40', 'plus_50', 'plus_60', 'plus_70','plus_other']
        cat_strike = pd.cut(data['Strike_diff'], bins=bins_strike, labels=labels_strike)
        Cat_strike = cat_strike.to_frame(name='Strike_group')
        strike_encoded = pd.concat([data, Cat_strike], axis=1)
        return strike_encoded

    def interest_rate_joiner(self , data, db, collectionTreasury):
        startdate = data['quote_date'].min()
        enddate = data['quote_date'].max()
        query = {
            'Date': {
                '$gte': startdate,
                '$lte': enddate
            }
        }

        project = {
            '_id':0,
            'Treasury': 1,
            'Date': 1,
        }
        interest_rates = pd.DataFrame(list(db[collectionTreasury].find(query, project)))
        if interest_rates.shape[0] != 0 :
            interest_rates.dropna(subset=['Date'], inplace=True)
            interest_rates = interest_rates.rename(columns={'Treasury': 'Interest_rate'})
            interest_rates['Date'] = interest_rates['Date'].dt.tz_localize('UTC')   
            print(interest_rates.head())
            interest_concat_data = data.merge(interest_rates, left_on='quote_date', right_on='Date', how='left')
        else:
            print('quote_date',data['quote_date'].iloc[0])
            option.Interest_rate = get_treasury(db)
            interest_rates = pd.DataFrame({'Interest_rate':[Interest_rate],'Date':[data['quote_date'].iloc[0]]})
    #         interest_rates['Date'] = interest_rates['Date'].dt.tz_localize('UTC')
            print(interest_rates.head())
            interest_concat_data = data.merge(interest_rates, left_on='quote_date', right_on='Date', how='left')
        return interest_concat_data

    def process(self , data):
        columns={'Gamma':'gamma_1545','Ask':'ask_1545','UnderlyingPrice':'underlying_bid_1545','Vega':'vega_1545',
            'Theta':'theta_1545','OptionSymbol':'optionroot','Expiration':'expiration','UnderlyingSymbol':'symbol',
            'Last':'last','Volume':'volume','Delta':'delta_1545','Strike':'strike','OpenInterest':'openinterest',
            ' DataDate':'quote_date','IVMean':'implied_volatility_1545','Bid':'bid_1545','Type':'option'}
        data.rename(columns=columns, inplace=True)
        data.dropna(subset=['quote_date', 'expiration'], inplace=True)
        data = data.drop(['Flags','T1OpenInterest','IVBid','IVAsk','AKA'], axis=1)
        data['option'] = data['option'].map({'put': 'p', 'call': 'c'})
        data = data.loc[data['option'] == 'p']
        print(data.shape)
        data['quote_date'] = pd.to_datetime(data['quote_date'], format="%m/%d/%Y").dt.tz_localize('UTC')
        data['expiration'] = pd.to_datetime(data['expiration'], format="%m/%d/%Y").dt.tz_localize('UTC')
        print(data.head())
        data = expiration_group_def(data)
        data = strike_group_def(data)
        data = interest_rate_joiner(data, db, collectionTreasury)
        
        return(data)

    def upload_options_data(self,db,DB,path):
        print("Data uploading")
        skip = 0
        select = 10000
        rows_added = 0
        max_rows = csv_length(path)
        print(" count : "+str(max_rows))

        if skip == 0:
            data = pd.read_csv(path, nrows=select)       
            data = process(data)
            print(data.head())
            Data = data.to_dict('records')
            try:
                result = DB.OptionsDataTEMP.insert_many(Data, ordered=True)
                skip = skip + select
                rows_added = rows_added + len(data['quote_date'])
                print("Added "+str(rows_added)+" rows.")    
                
            except pymongo.errors.BulkWriteError as e:
                print(e.details['writeErrors'])       
            
        while skip < max_rows:
            print('Skip: '+str(skip)+', rows_added: '+str(rows_added))
            data = pd.read_csv(path, nrows=select, skiprows=range(1,skip))
            
            data = process(data)
            Data = data.to_dict('records')
            try:
                result = DB.OptionsDataTEMP.insert_many(Data, ordered=True)
                skip = skip + select
                rows_added = rows_added + len(data['quote_date'])
                print("Added "+str(rows_added)+" rows.")
                
            except pymongo.errors.BulkWriteError as e:
                print(e.details['writeErrors'])      
        print('option data upload complete')
        return ()
    # L3_optionstats_20180807.csv

    def csv_length(self,path):
        data = pd.read_csv(path)
        return data.shape[0]

    def pricing_data(self,data):
        data.rename(columns={'quotedate':'Date'}, inplace=True)
        data['Date'] = pd.to_datetime(data['Date'], format="%m/%d/%Y").dt.tz_localize('UTC')
        data.dropna(inplace=True)
        return(data)

    def upload_price_data(self,db,DB,path):
        print("Data uploading")
        skip = 0
        select = 10000
        rows_added = 0
        max_rows = csv_length(path)
        print(" count : "+str(max_rows))

        if skip == 0:
            data = pd.read_csv(path, nrows=select)
            data = pricing_data(data)
            Data = data.to_dict('records')
            try:
                DB.PricingDataTEMP.insert_many(Data, ordered=True)
                skip = skip + select
                rows_added = rows_added + len(data['Date'])
                print("Added "+str(rows_added)+" rows.")
                
            except pymongo.errors.BulkWriteError as e:
                print(e.details['writeErrors'])  
        
        while skip < max_rows:
            print('Skip: '+str(skip)+', rows_added: '+str(rows_added))
            data = pd.read_csv(path, nrows=select, skiprows=range(1,skip))
            
            data = pricing_data(data)
            Data = data.to_dict('records')        
            try:
                DB.PricingDataTEMP.insert_many(Data, ordered=True)
                skip = skip + select
                rows_added = rows_added + len(data['Date'])
                print("Added "+str(rows_added)+" rows.")
            
            except pymongo.errors.BulkWriteError as e:
                print(e.details['writeErrors'])
            print('pricing data upload complete')
        return

option = ftp_daily_optionsdata()

def job(file):
    today = datetime.datetime.today().strftime('%Y-%m-%d')

    if today in business_days:
        # Interest_rate = get_treasury(db)
        zip_file = option.grabFile(file)
        # open zip file
        fh = open(''+file+'', 'rb')
        z = zipfile.ZipFile(fh)
        print(z)
        # unzip file and extract in folder
        for name in z.namelist():
            print(name)
            outpath = "./daily_file"            
            z.extract(name, outpath)
            print(outpath)
            if re.match("(.*)L3_options_(.*)", outpath+'/'+name):
                print(outpath+'/'+name)
                option.upload_options_data(db,DB,path=outpath+'/'+name)
                os.remove(outpath+'/'+name)
            elif re.match("(.*)L3_stockquotes_(.*)",outpath+name):
                option.upload_price_data(db,DB,path=outpath+'/'+name)
                os.remove(outpath+'/'+name)
        fh.close()
    else:
        print('today is not business day')

schedule.every().day.at("15:13").do(job(self,file))

while True:
schedule.run_pending()
time.sleep(1)

"""