# INM432: Big Data - Coursework (Part II)

## Predicting shifts in GBP-EUR exchange rates based on the content of UK parliamentary debates: A pySpark application

### Alexandros Dimitrios Nalmpantis; Georgios Kyriakopoulos (2017)

The pySpark code presented in this notebook aims to construct a modelling process, which predicts shifts in GBP-EUR exchange rates on the basis of political narrative. More specifically, the code implements a process that, schematically speaking, involves the following steps:

(a) It scrapes the (almost daily) reports that record debates at the House of Commons and the House of Lords, as published on the UK Government website. The reports (available in PDF format) are converted into txt format, before removing common and usually uninformative words (i.e. stopwords), numeric figures, and punctuation symbols. File names and the remainder text content are converted into a data-frame. Following the construction of this dataframe, the content of the debate reports is tonekinsed and hashed, while term frequencies (TFs) and inverse document frequencies (IDFs) are calculated for the hashes.

(b) It scrapes the (almost daily) EUR-GBP exchange rates, published on the Bank of England website. The content is written in a text file, which is then processed to remove blank space and to calculate the difference between each GBP-EUR exchange-rate value and its immediately previous exchange-rate value (i.e. *the exchange-rate shift*). Dates and exhange-rate shifts are then converted into a second data-frame.

(c) It links the data frames constracted at (a) and (b) together, so that the content of the debate reports at a certain date is appended to the shift in GBP-EUR exchange rates recorded the day after. This new dataframe serves as the analytical input on the basis of which the main analysis is conducted.

(d) Finally, it constructs  a machine learning pipeline, whereby the linear regression algorithm is trained (and validated) to predict exchange-rate shifts based on either the TF or the IDF of the (tokenised and hashed) content of the debate reports. Alternative parameterisation options are explored using a grid, to help optimise prediction.

The findings of the analysis are highlighted within the anotation of this code and, broadly speaking, they appear to suggest that data-driven approaches (such as the one presented here) have the potential to detect statistical links  between  macroeconomic finacial-market parameters and topical political narratives. It is acknowledged that time-series modelling techniques (rather than linear regression) would be a more appropriate analytical approach, given the nature of the analysis dataset. However, for the purposes of this preliminary study we are (partially) adressing the issue of autocorrelations between data points by focusing the analysis on exchange-rate shifts rather than exchange rate values.

## 1. Modules

* Modules needed for the analysis are imported below. Some modules may need to be installed with the following commands to a termninal: **pip install <"name of module">** eg: pip install tqdm  or **with conda install <"name of module">** eg: conda install tqdm

In [1]:
# Import modules for scraping links
from bs4 import BeautifulSoup
import urllib.request
import re
import datetime
from datetime import date,timedelta
import os
import numpy as np

# Import modules for downloading links
import wget
import pandas as pd

# Import modules for parsing pdf's,progress bars and handling errors
import warnings
from tqdm import tnrange, tqdm_notebook
from tika import parser
warnings.filterwarnings("ignore", category=UserWarning, module='bs4')

# Import modules for spark ML and SQL
import re
from pyspark import SparkContext
from pyspark.ml.feature import Tokenizer,HashingTF, IDF
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder,TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import *
from pyspark.sql import *

# import various operators
from math import log
import time
from pprint import pprint
import sys
import matplotlib.pyplot as plt
from  stop_words import get_stop_words

# filter out warnings
import warnings
warnings.filterwarnings("ignore")
# Set sparkcontext as sc
sc=SparkContext()

## 2. Data collection and pre-processing

###     2.1 Defining and calling wrapped procedures from scraping, downloading and converting parliamentary debates to text files

In [2]:
# Define if report data were provided ( yes - data were provided, no - data need to be scraped)
trg='yes'

    # set from which date to current date that the function will download reports in YYYY-M-D format below
start_date = date(2016, 6, 23)

In [3]:
# Data control function that controls wether the data will be scraped or were provided by students
def data_control(page,start_date,trg):
    if trg=='yes':
        print ('====Data were given====')
    else:
        html_page = urllib.request.urlopen(page) #request page with urllib packages
        soup = BeautifulSoup(html_page) #pass the page to beautiful soup in order to extract the links contained in webpage 
        #print (soup) #visually inspect the html structure
        hl = [] #set hyperlink array to store the extracted links
        ##search html for hyperlinks starting with qna
        for hyperlink in soup.findAll('a', attrs={'href': re.compile("^http://qna")}): 
            hl.append(hyperlink.get('href')) #store the hyperlinks found on an array
        #    print (link.get('href'))
        
        url=[hl[1][:-22]+'Lords-',hl[1][:-22]+'Commons-'] #take first result and cut the dates and category of either lords or commons
    
        #date interval search set and downloading of the pdf files
        ##create interval search date
        today=datetime.datetime.today() #today's date set
        cur_date = date(today.year,today.month,today.day)  # set current date in format of YYYY-MM-DD
    
        dt = cur_date - start_date #calculate interval in days to use for loop
        #make directory to downloaded files
        try:
            os.makedirs(os.getcwd()+'/parliament_practicals') #make directory to downloaded files
        except:
            pass
    
        #loop throught the interval with 1 day step and append the date to url along with categories of either house of Lords or Commons
        for ul in url:
        #    print ('Downloading: ',str('House of '+ul[112:-2]+'s'))
            for i in tnrange(dt.days + 1,desc='Downloading: '+str(ul[112:-2]+'s')):
                try: #test for errors and pass since there are dates that the House of Lords do not convene and HTTP request returns error; Also store results on folder parliament practicals
                    filename = wget.download(ul+str(start_date + timedelta(days=i))+'.pdf',os.getcwd()+'/parliament_practicals')
                except:
                    next  
    
# function to convert the downloaded pdfs to text files
def convert_pdf_to_text(trg):
    if trg=='yes': # user input in case data are already given in appropriate format
        print ('====Data were given====')
    else:
        try: # test if directory textfiles already exists otherwise make the directory
            os.makedirs(os.getcwd()+'/textfiles') #make directory to downloaded files
        except:
            pass
        list_of_files=os.listdir(os.getcwd()+'/parliament_practicals') # create a list of pdf files to be converted
        for i in tnrange(len(list_of_files),desc='Converting pdf to txt'): # iterate throught the files on the list and install progress bar
            if list_of_files[i].endswith(".pdf"): # check that file input is pdf file
                parsedPDF=parser.from_file(os.getcwd()+'/parliament_practicals/'+list_of_files[i]) # parse pdf file
                text_file = open(os.getcwd()+'/textfiles/'+list_of_files[i][:-4]+'.txt', 'a') # create new filename with extension .txt
                text_file.write(parsedPDF["content"]) # write parsed pdf to text
                text_file.close() # close text file
            else: # if file other than pdf continue loop
                next
def download_practicals_convert(start_date,trg):
    page="http://www.parliament.uk/business/publications/written-questions-answers-statements/daily-reports/" # set link to parliament daily questions and answers reports
    data_control(page,start_date,trg) # call set data function
    convert_pdf_to_text(trg)# convert to pdf function
    print ('==Downloading and conversion to text files completed==')
    
    # Call function to either download the data or set current folder as working folder...please make sure that
    # if data are give then those should be stored on the folder: 'parliament_practicals'
    # We suggest to run the scraping function since it only takes 2minutes for downloading a year of reports   
download_practicals_convert(start_date,trg) 

====Data were given====
====Data were given====
==Downloading and conversion to text files completed==


### 2.2 Defining and calling wrapped procedures for scraping, downloading and converting time-series exchange-rate shifts to a dataframe

In [13]:
# Define if exchange rate data were provided ( yes - data were provided, no - data need to be scraped)
trg='no' # change this value to yes or no

In [17]:
# define function to download exchange rates to text file in folder xr
# Clean exchange data download function and transform to pandas dataframe
def clean_ex(file_path):
    data = pd.read_csv(file_path,sep=" t ",header=None, encoding="ISO-8859-1") # load the text file
    remove_words=['Bank of England Statistical Interactive Database','Series 1 to 1','Spot exchange rate, Euro into Sterling','XUDLERS','Ã‚','Â','var w=window;if(w.performance||w.mozPerformance||w.msPerformance||w.webkitPerformance){var d=document;AKSB=w.AKSB||{},AKSB.q=AKSB.q||[],AKSB.mark=AKSB.mark||function(e,_){AKSB.q.push(["mark",e,_||(new Date).getTime()])},AKSB.measure=AKSB.measure||function(e,_,t){AKSB.q.push(["measure",e,_,t||(new Date).getTime()])},AKSB.done=AKSB.done||function(e){AKSB.q.push(["done",e])},AKSB.mark("firstbyte",(new Date).getTime()),AKSB.prof={custid:"445139",ustr:"",originlat:"0",clientrtt:"9",ghostip:"62.24.201.160",ipv6:false,pct:"10",clientip:"85.211.227.93",requestid:"4bf4eea",region:"16972",protocol:"",blver:13,akM:"r",akN:"",akTT:"O",akTX:"1",akTI:"4bf4eea",ai:"286733",ra:"false",pmgn:"",pmgi:"",pmp:"",qc:""},function(e){var _=d.createElement("script");_.async="async",_.src=e;var t=d.getElementsByTagName("script"),t=t[t.length-1];t.parentNode.insertBefore(_,t)}(("https:"===d.location.protocol?"https:":"http:")+"//ds-aksb-a.akamaihd.net/aksb.min.js")}']
    for word in remove_words: # remove words
        data=data.replace(word,np.nan) # remove words
    data=data.dropna()# drop nan
    data.reset_index() # reset indices
    rate=data.iloc[::2] # extract odd rows
    date=data.iloc[1::2] # extract even rows
    date.reset_index(inplace=True,drop=True) # reset indices
    rate.reset_index(inplace=True,drop=True) # reset indices
    x=pd.concat([date,rate],axis=1) # concatenate date and rates
    x.columns=['Rate','Date'] # rename columns
    x=x.dropna()# drop nan
    x=x.drop_duplicates('Date') # drop duplicates
    x['Date'] = pd.to_datetime(x['Date']) # convert date column to date
    x[['Rate']] = x[['Rate']].apply(pd.to_numeric) #convert exchange rate to float
    x = x.set_index('Date').diff() # calculate [rate(t+1) - rate(t)]
    x.reset_index(inplace=True)# reset Date column
    x['Date']=x['Date'].dt.strftime('%Y-%m-%d')# convert to string for join matching operations
    x.Rate = x.Rate.shift(-1)# shift rate column by one day to account for the delay of the parliament report
    x=x.dropna()# drop na
    x.to_csv(os.getcwd()+'/xr/exchangeRates_diff.csv')# save to csv file
    return x # return dataframe

def download_xr(trg):
    html= urllib.request.urlopen("http://www.bankofengland.co.uk/boeapps/iadb/fromshowcolumns.asp?Travel=NIxIRxSUx&FromSeries=1&ToSeries=50&DAT=RNG&FD=1&FM=Jan&FY=1963&TD=11&TM=Apr&TY=2017&VFD=Y&CSVF=TT&C=C8J&Filter=N&html.x=11&html.y=9")
    if trg=='yes': # user input in case data are already given in appropriate format
        print ('====Data were given====')
        data=clean_ex(os.getcwd()+'/xr/exchangeRates.txt')
    else:
        try: # test if directory xr already exists otherwise make the directory
            os.makedirs(os.getcwd()+'/xr') #make directory to downloaded files
        except:
            pass
        soup_xr = BeautifulSoup(html)
        xr = soup_xr.get_text()
        #print(xr)
        text_xr = open(os.getcwd()+'/xr/'+'exchangeRates'+'.txt', "a")
        text_xr.write(xr)
        text_xr.close()
        data=clean_ex(os.getcwd()+'/xr/exchangeRates.txt')
        next
    return data

data_xr = download_xr(trg)
print(data_xr.head()) # print head

         Date    Rate
0  1999-01-04 -0.0019
1  1999-01-05  0.0082
2  1999-01-06  0.0007
3  1999-01-07  0.0068
4  1999-01-08  0.0011


## 3. Pipeline

### Task A: Select a dataset and make initial load and transformations

In [18]:
# Create dataframe of filename - text with numbers and punctuation removed

# Set stop word parameters-for stopwords removal the stop_words pachage was used
# The  StopWordsRemover from pyspark.ml.features was also tested extensively but was not effective or buggy
stop_words = get_stop_words('english')

def remove_n_p(text): # function that removes punctuation and numbers as well lowercasing the text
    text = re.sub(r'\d+','', text) # remove numbers from texts with regular expressions <<<<<
    text = re.sub(r'\[.*?\]|\(.*?\)|\W+', ' ', text)# remove punctuation from texts with regular expressions <<<<<
    text=text.lower() # lowercase the text
    text = ' '.join([word for word in text.split() if word not in stop_words]) # remove stopwords
    return text

# extract date from filename function 
def trim_filename(filename):
    date=filename[-14:-4] # extract the timestamp from end of the file
    return date # return date
    
# sparksession added for spark dataframes
spark = SparkSession.builder.getOrCreate()

def make_dataFrame(dirPath): # make a dataFrame with filename and text 
    ft_RDD = sc.wholeTextFiles(dirPath) # add code to create an RDD with wholeTextFiles
    spm_t_RDD = ft_RDD.map(lambda ft: (trim_filename(ft[0]), remove_n_p(ft[1]))) # create RDD with filename and call remove_n_p function to text
    file_text_df = spark.createDataFrame(spm_t_RDD,schema=['id','text']) # create a dataFrame - filename - text
    return file_text_df

In [19]:
# Convert currency pandas dataframe to Spark Dataframe and specify datatypes

def currency_df(data_xr):# function to create dataframe
    data_xr_DF=spark.createDataFrame(data_xr,schema=['Date','Rate']) # create dataframe
    data_xr_DF=data_xr_DF.withColumn('Rate', data_xr_DF['Rate'].cast('float'))# convert rate to float
    return data_xr_DF # return spark dataframe

# function to join the two dataframes on dates, the exchange rate dates have been shifted back by one day
# in order to account for the delay of the parliament publication
def connect_xr_df(file_text_df,data_xr_DF): # the function takes the two inpurts of file_text_df and the data_xr_DF from the previous function
    file_text_Date_rate=file_text_df.join(data_xr_DF,file_text_df.id==data_xr_DF.Date,'leftouter') # the exchange rates were connected to the timestamp of the parliament files with a matching left outer join
    file_text_Date_rate.createOrReplaceTempView("temp") # create a temporary sql view
    file_text_Date_rate_sql = spark.sql("SELECT id,text,Rate as label FROM temp") # select statement of the three columns required for analysis and relabeling
    file_text_Date_rate_sql.show(5)
    return file_text_Date_rate_sql # return the dataframe for analysis

id_text_label=connect_xr_df(make_dataFrame(os.getcwd()+'/textfiles'),currency_df(data_xr))

+----------+--------------------+-------+
|        id|                text|  label|
+----------+--------------------+-------+
|2017-02-24|daily report frid...|-0.0069|
|2017-02-24|friday february p...|-0.0069|
|2016-07-06|daily report wedn...| 0.0041|
|2016-07-06|wednesday july p ...| 0.0041|
|2017-03-08|daily report wedn...|-0.0044|
+----------+--------------------+-------+
only showing top 5 rows



### Task B: Implement a machine learning pipeline in Spark, including feature extractors, transformers, and/or selectors. 

In [29]:
#Step1: use tokenizer to split word into array and sql to select the filename - word_array created
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(id_text_label)

#Step2: make hashTF sparse vector with maximum 500 features
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

#Step 3: feed hash vector to calculate idf
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="idf") 

#Step4: linear regression parameters
lr_tf = LinearRegression()\
     .setFeaturesCol("features")\
     .setLabelCol('label')

lr_idf = LinearRegression()\
     .setFeaturesCol("idf")\
     .setLabelCol('label')

#Step 5: configure alternative pipelines 
pipeline_tf = Pipeline(stages=[tokenizer, hashingTF, lr_tf]) #with hash vector tf
pipeline_idf = Pipeline(stages=[tokenizer, hashingTF, idf, lr_idf]) #with hash vector idf 

#Step 6: set exemplar parameter grid
paramGrid_tf = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [500]) \
    .addGrid(lr_tf.regParam, [0.3]) \
    .addGrid(lr_tf.maxIter, [50]) \
    .addGrid (lr_tf.elasticNetParam,[0.8])\
    .build()

paramGrid_idf = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [500]) \
    .addGrid(lr_idf.regParam, [0.3]) \
    .addGrid(lr_idf.maxIter, [50]) \
    .addGrid (lr_idf.elasticNetParam,[0.8])\
    .build()

NameError: name 'id_text_label' is not defined

### Task C: Evaluate the performance of your pipeline using training and test set 

In [None]:
evaluator = RegressionEvaluator(metricName="r2")

tvs_tf = TrainValidationSplit(estimator=pipeline_tf, 
                           estimatorParamMaps=paramGrid_tf,
                           evaluator =evaluator,
                           trainRatio=0.8) # 80% of the data will be used for training, 20% for validation


# Run TrainValidationSplit, and choose the best set of parameters.
tvsModel_tf = tvs_tf.fit(id_text_label)
rsquared_tf = evaluator.evaluate(tvsModel_tf.transform(id_text_label))
print("---Linear Regression with hash TF: R Squared is %s ---" % (rsquared_tf))



tvs_idf = TrainValidationSplit(estimator=pipeline_idf, 
                           estimatorParamMaps=paramGrid_idf,
                           evaluator =evaluator,
                           trainRatio=0.8) # 80% of the data will be used for training, 20% for validation

# Run TrainValidationSplit, and choose the best set of parameters.
tvsModel_idf = tvs_idf.fit(id_text_label)
rsquared_idf = evaluator.evaluate(tvsModel_idf.transform(id_text_label))
print("---Linear Regression with hash IDF: R Squared is %s ---" % (rsquared_idf))


### Task D: Implement a parameter grid

In [None]:
parameters = {'Number_of_Features_hTF': [50, 100],
              'Regression_Parameters': [0.1, 0.3],
              'Regression_Iterations': [10, 20],
              'Regression_ElasticParam': [0.1, 0.3],
              'Train_Ratio':[0.6, 0.7, 0.8, 0.9],
              'Feature_Experiment':['idf','hTF']}

In [None]:
# Build parameters required for experiments

# Pipeline function to fit the process of building a pipeline on various inputs
    
def pipeline_fc(feature): # next step function for bulding pipeline
    # Step1: use tokenizer to split word into array and sql to select the filename - word_array created
    tokenizer = Tokenizer(inputCol='text', outputCol='words')
    
    #Step2: make hashTF sparse vector
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='features') # set parameters for hashing
    
    #Step 3: feed hash vector to calculate idf
    idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol='idf') # set parameters for idf
    
    #Step4: linear regression parameters and if statement for idf
    
    if feature=='hTF': # input for hashing only pipeline
        lr = LinearRegression().setFeaturesCol('features').setLabelCol('label') #
        pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) 
    else: #input for hashing-idf pipeline
        lr = LinearRegression().setFeaturesCol('idf').setLabelCol('label')
        pipeline = Pipeline(stages=[tokenizer, hashingTF,idf,lr]) 
    return pipeline # return pipeline
    

#  Step 5: parameter grid set from user input on parameter dictionary
paramGrid_exper = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, parameters['Number_of_Features_hTF']) \
        .addGrid(lr_idf.regParam, parameters['Regression_Parameters']) \
        .addGrid(lr_idf.maxIter, parameters['Regression_Iterations']) \
        .addGrid (lr_idf.elasticNetParam,parameters['Regression_ElasticParam'])\
        .build()
        

#  Step 6: parameter grid set from user input on parameter dictionary
evaluator = RegressionEvaluator(metricName="r2", labelCol="label", predictionCol="prediction") # evaluator setting

In [None]:
#  User parameters input for regression and wraping of functions from reading data to running the experiments

#  the parameters for the experiments can be set below on the table
parameters = {'Number_of_Features_hTF': [50, 100],
              'Regression_Parameters': [0.1, 0.3],
              'Regression_Iterations': [10, 20],
              'Regression_ElasticParam': [0.1, 0.3],
              'Train_Ratio':[0.6, 0.7, 0.8, 0.9],
              'Feature_Experiment':['idf','hTF']}

#  data transformations function call for both parliament reports and currency
id_text_label=connect_xr_df(make_dataFrame(os.getcwd()+'/textfiles'),currency_df(data_xr))

#  Experiment loop changing the pipeline 

for ratio in parameters['Train_Ratio']:
    for feature in parameters['Feature_Experiment']:
        pipeline=pipeline_fc(feature)
        print ('\n***** Experimen initiated *****')
        print ('\n Parameters: Train Ratio('+str(ratio)+'),Feature :('+str(feature)+')')
        start_time = time.time()
        tvs_exper = TrainValidationSplit(estimator=pipeline,estimatorParamMaps=paramGrid_exper,evaluator =evaluator,trainRatio=ratio)
        tvsModel_exper = tvs_exper.fit(id_text_label)
        print("--- Training-validation completed in %s seconds ---" % (time.time() - start_time))
        print(tvsModel_exper.bestModel)
        rsquared_idf = evaluator.evaluate(tvsModel_exper.transform(id_text_label))
        print("---Linear Regression with hash IDF: R Squared is %s ---" % (evaluator.evaluate(tvsModel_exper.transform(id_text_label))))
        #print(list(zip(tvsModel_exper.getEstimatorParamMaps())))