# Big Data Project
## Outcome: Save Divergence

In this notebook we will add 'Divergence' to dataset

'Divergence' will be used to detect matches with a big difference between bet houses quotas.

In [1]:
import os
import os.path
import sys
import shutil
import datetime
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
from pyspark.sql.functions import udf

spark = SparkSession \
    .builder \
    .appName("Bets Exploring") \
    .getOrCreate()
    
sc = spark.sparkContext

sc

In [2]:
pathori = "../Data/Raw/Main/"
pathtemp = "../Data/Interim/"
pathdest = "../Data/Processed/"
filetempmain = "main_competitions_consistent.csv"
filetemprecent = "main_competitions_consistent_recent.csv"
filedestmain = "main_competitions.csv"
filedestrecent = "main_competitions_recent.csv"
bethouses = ['B365', 'BS', 'BW', 'GB', 'IW', 'LB', 'SB', 'SJ', 'VC', 'WH']
quotatypes = ['H','D','A']

In [3]:
# Calculate Bets Houses included in Dataframe

def filterBetHouses(df, bethouses):
    betHousesOk = []
    for bethouse in bethouses:
        col = bethouse + 'H'
        if col in df.columns:
            betHousesOk.append(bethouse)
    
    return betHousesOk

In [4]:
# Calculate divergence for a individual row

def calcDivergence(match, bethouses):
    resultList = ['H','D','A']
    divergence = 0.0
    for result in resultList:
        min = 1000
        max = 0
        sum = 0
        count = 0
        for bethouse in bethouses:
            col = bethouse + result
            val = match[col]
            if not val is None:
                sum = sum + val
                count = count + 1
                if val > max:
                    max = val
                if val < min:
                    min = val
        if count > 0: 
            mean = sum/count
            diverMax = ((max / mean) - 1) * 100
            if diverMax > divergence:
                divergence = diverMax
            diverMin = (abs((min / mean) - 1)) * 100
            if diverMin > divergence:
                divergence = diverMin
            
    return divergence

In [5]:
# Converts information of the row in a list of value

def reformatRow(match):
    newRow = []
    for col in match[0]:
        newRow.append(col)
    newRow.append(match[1])
    return newRow

In [6]:
# Create a new Dataframe with RDD info adding 'Divergence' in last column

def reformatRDD(rdd):
    # Convert rows in lists
    rddnew = rdd.map(lambda match: reformatRow(match))
    
    return rddnew

In [7]:
def createDataFrame(rdd, schemaIni):
    # Obtain column names
#    df.withColumn('Divergence')
#    columns = df.columns
#    columns.append('Divergence')
#    print (columns)
    
    # Obtains schema of filds
    fields = [field for field in schemaIni]
    fields.append(StructField('Divergence',DoubleType(), True))
    schema = StructType(fields)
#    print (schema)
    
    # Create the new Dataframe
    dfnew = spark.createDataFrame(rdd, schema)
#    print (dfnew.show(2))
    
    return dfnew

In [8]:
# Save a Dataframe to a CSV merging possible partition files

def saveDFtoCSV(df, pathdest, filedest):
    
    path = pathdest + "temp/"
    if os.path.exists(path):
        shutil.rmtree(path)
        
    df.write.save(path, format="csv", header="true")

    urldest = pathdest + filedest
    if os.path.exists(urldest):
        os.remove(urldest)
        
    for file in os.listdir(path):
        if file.endswith(".csv"):
            print (file)
            os.system("cat " + path + file + " >> " + urldest)

In [10]:
# Open a dataset, calculate divergence for all rows save result

def calcAndSaveDivergence(pathori, fileori, pathdest, filedest):
    # Open dataset
    df = spark.read.csv(path = pathori + fileori, header = True, inferSchema = True)
    df = df.withColumn("Date", df["Date"].cast("date"))
    print ("Rows   :", df.count())
    df.show(2)
    df.printSchema()
    
    # Map divergence
    bethousesok = filterBetHouses(df, bethouses)
    print ("Bet Houses:", bethousesok)

    rdddiver = df.rdd.map(lambda match: (match, calcDivergence(match, bethousesok)))
    print ("\n RDD with calculated divergence:")
    display (rdddiver.take(2))

    print ("RDD adding divergence as last column:")
    rddnew = reformatRDD(rdddiver)
    print (rddnew.take(2))

    print ("\nResulting Dataframe")
    dfdiver = createDataFrame(rddnew, df.schema)
    dfdiver.show(2)

    print ("Save Dataframe as CSV")
    saveDFtoCSV(dfdiver, pathdest, filedest)
    print ("Created file:", pathdest + filedest)    

In [11]:
# Calculating divergence to recent matches dataset

calcAndSaveDivergence(pathtemp, filetemprecent, pathdest, filedestrecent)

Rows   : 11374
+-------+-------------+---------+---+----------+---------+----------+---+-----+-----+-----+----+---+----+---+---+---+----+---+----+----+---+----+----+----+----+
|Country|  Competition|   Season|Div|      Date| HomeTeam|  AwayTeam|FTR|B365H|B365D|B365A| BWH|BWD| BWA|IWH|IWD|IWA| LBH|LBD| LBA| VCH|VCD| VCA| WHH| WHD| WHA|
+-------+-------------+---------+---+----------+---------+----------+---+-----+-----+-----+----+---+----+---+---+---+----+---+----+----+---+----+----+----+----+
|Belgium|JupilerLeague|2017-2018| B1|2017-07-28|  Antwerp|Anderlecht|  D| 5.75|  3.8|  1.6| 5.0|4.0|1.67|4.7|3.6|1.7| 5.2|3.7|1.61| 5.5|3.9|1.62|4.75|3.75|1.67|
|Belgium|JupilerLeague|2017-2018| B1|2017-07-29|Charleroi|  Kortrijk|  H| 1.62| 3.75|  5.5|1.67|3.7| 5.5|1.6|3.7|5.4|1.61|3.6| 5.2|1.65|3.8|5.25|1.65|3.75| 4.8|
+-------+-------------+---------+---+----------+---------+----------+---+-----+-----+-----+----+---+----+---+---+---+----+---+----+----+---+----+----+----+----+
only showing top 2 

[(Row(Country='Belgium', Competition='JupilerLeague', Season='2017-2018', Div='B1', Date=datetime.date(2017, 7, 28), HomeTeam='Antwerp', AwayTeam='Anderlecht', FTR='D', B365H=5.75, B365D=3.8, B365A=1.6, BWH=5.0, BWD=4.0, BWA=1.67, IWH=4.7, IWD=3.6, IWA=1.7, LBH=5.2, LBD=3.7, LBA=1.61, VCH=5.5, VCD=3.9, VCA=1.62, WHH=4.75, WHD=3.75, WHA=1.67),
  11.65048543689322),
 (Row(Country='Belgium', Competition='JupilerLeague', Season='2017-2018', Div='B1', Date=datetime.date(2017, 7, 29), HomeTeam='Charleroi', AwayTeam='Kortrijk', FTR='H', B365H=1.62, B365D=3.75, B365A=5.5, BWH=1.67, BWD=3.7, BWA=5.5, IWH=1.6, IWD=3.7, IWA=5.4, LBH=1.61, LBD=3.6, LBA=5.2, VCH=1.65, VCD=3.8, VCA=5.25, WHH=1.65, WHD=3.75, WHA=4.8),
  9.004739336492884)]

RDD adding divergence as last column:
[['Belgium', 'JupilerLeague', '2017-2018', 'B1', datetime.date(2017, 7, 28), 'Antwerp', 'Anderlecht', 'D', 5.75, 3.8, 1.6, 5.0, 4.0, 1.67, 4.7, 3.6, 1.7, 5.2, 3.7, 1.61, 5.5, 3.9, 1.62, 4.75, 3.75, 1.67, 11.65048543689322], ['Belgium', 'JupilerLeague', '2017-2018', 'B1', datetime.date(2017, 7, 29), 'Charleroi', 'Kortrijk', 'H', 1.62, 3.75, 5.5, 1.67, 3.7, 5.5, 1.6, 3.7, 5.4, 1.61, 3.6, 5.2, 1.65, 3.8, 5.25, 1.65, 3.75, 4.8, 9.004739336492884]]

Resulting Dataframe
+-------+-------------+---------+---+----------+---------+----------+---+-----+-----+-----+----+---+----+---+---+---+----+---+----+----+---+----+----+----+----+-----------------+
|Country|  Competition|   Season|Div|      Date| HomeTeam|  AwayTeam|FTR|B365H|B365D|B365A| BWH|BWD| BWA|IWH|IWD|IWA| LBH|LBD| LBA| VCH|VCD| VCA| WHH| WHD| WHA|       Divergence|
+-------+-------------+---------+---+----------+---------+----------+---+-----+-----+-----+----+---+----+---+---+---+----+---+----+----

In [12]:
# Calculating divergence to the complete dataset

calcAndSaveDivergence(pathtemp, filetempmain, pathdest, filedestmain)

Rows   : 106583
+-------+-------------+---------+---+----------+-----------+--------+---+-----+-----+-----+----+----+----+----+----+----+---+---+----+----+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|Country|  Competition|   Season|Div|      Date|   HomeTeam|AwayTeam|FTR|B365H|B365D|B365A| BSH| BSD| BSA| BWH| BWD| BWA|GBH|GBD| GBA| IWH|IWD|IWA| LBH| LBD| LBA| SBH| SBD| SBA| SJH| SJD| SJA| VCH| VCD| VCA| WHH| WHD| WHA|
+-------+-------------+---------+---+----------+-----------+--------+---+-----+-----+-----+----+----+----+----+----+----+---+---+----+----+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|Belgium|JupilerLeague|2003-2004| B1|2003-08-08|Club Brugge|    Genk|  H|  1.4| 3.75|  7.0|null|null|null|null|null|null|1.4|3.8|6.85|1.45|3.8|5.4|null|null|null|1.44|3.75| 6.5|null|null|null|null|null|null|null|null|null|
|Belgium|JupilerLeague|2003-2004| B1|2003-09-08| Anderlecht| Antwerp|  H|1.167|  6.0| 10.0|n

[(Row(Country='Belgium', Competition='JupilerLeague', Season='2003-2004', Div='B1', Date=datetime.date(2003, 8, 8), HomeTeam='Club Brugge', AwayTeam='Genk', FTR='H', B365H=1.4, B365D=3.75, B365A=7.0, BSH=None, BSD=None, BSA=None, BWH=None, BWD=None, BWA=None, GBH=1.4, GBD=3.8, GBA=6.85, IWH=1.45, IWD=3.8, IWA=5.4, LBH=None, LBD=None, LBA=None, SBH=1.44, SBD=3.75, SBA=6.5, SJH=None, SJD=None, SJA=None, VCH=None, VCD=None, VCA=None, WHH=None, WHD=None, WHA=None),
  16.11650485436893),
 (Row(Country='Belgium', Competition='JupilerLeague', Season='2003-2004', Div='B1', Date=datetime.date(2003, 9, 8), HomeTeam='Anderlecht', AwayTeam='Antwerp', FTR='H', B365H=1.167, B365D=6.0, B365A=10.0, BSH=None, BSD=None, BSA=None, BWH=None, BWD=None, BWA=None, GBH=1.2, GBD=5.5, GBA=9.25, IWH=1.3, IWD=4.2, IWA=8.0, LBH=None, LBD=None, LBA=None, SBH=1.2, SBD=5.5, SBA=10.0, SJH=None, SJD=None, SJA=None, VCH=None, VCD=None, VCA=None, WHH=1.22, WHD=5.0, WHA=9.5),
  19.84732824427481)]

RDD adding divergence as last column:
[['Belgium', 'JupilerLeague', '2003-2004', 'B1', datetime.date(2003, 8, 8), 'Club Brugge', 'Genk', 'H', 1.4, 3.75, 7.0, None, None, None, None, None, None, 1.4, 3.8, 6.85, 1.45, 3.8, 5.4, None, None, None, 1.44, 3.75, 6.5, None, None, None, None, None, None, None, None, None, 16.11650485436893], ['Belgium', 'JupilerLeague', '2003-2004', 'B1', datetime.date(2003, 9, 8), 'Anderlecht', 'Antwerp', 'H', 1.167, 6.0, 10.0, None, None, None, None, None, None, 1.2, 5.5, 9.25, 1.3, 4.2, 8.0, None, None, None, 1.2, 5.5, 10.0, None, None, None, None, None, None, 1.22, 5.0, 9.5, 19.84732824427481]]

Resulting Dataframe
+-------+-------------+---------+---+----------+-----------+--------+---+-----+-----+-----+----+----+----+----+----+----+---+---+----+----+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----------------+
|Country|  Competition|   Season|Div|      Date|   HomeTeam|AwayTeam|FTR|B365H|B365D|B365A| BSH| BSD| BSA| 