In [6]:
import numpy as np
import sys
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import DateType
import os
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.streaming import StreamingContext

In [53]:

#import data
def read_data(spark):
    directory_path1 = os.path.join(os.getcwd(),"Full_Data_Pack_1")
    directory_path2 = os.path.join(os.getcwd(),"Full_Data_Pack_2")

    df_horse = spark.read.csv(os.path.join("horses.csv"), header=True, inferSchema=True)
    df_jockey = spark.read.csv(os.path.join("jockeys.csv"), header=True, inferSchema=True)
    df_races_sectional = spark.read.csv(os.path.join("races_sectional.csv"), header=True, inferSchema=True)
    df_trainer = spark.read.csv(os.path.join("trainer.csv"), header=True, inferSchema=True)
    df_sectional = spark.read.csv(os.path.join("sectional_table.csv"), header=True, inferSchema=True)
    df_records = spark.read.csv(os.path.join("records.csv"), header=True, inferSchema=True)
    df_races = spark.read.csv(os.path.join("races.csv"), header=True, inferSchema=True)
    df_foal = spark.read.csv(os.path.join("foal_date.csv"), header=True, inferSchema=True)
    
    return df_races, df_races_sectional, df_trainer, df_jockey, df_records, df_horse, df_sectional,df_foal

#trainer preprocessing
def trainer_preprocessing(df):
    df = df.withColumn('Total_wins',when(df["Total_wins"].isNull(),0).\
                      otherwise(df["Total_wins"]))
    df = df.withColumn('Total_second_places',when(df["Total_second_places"].isNull(),0).\
                      otherwise(df["Total_second_places"]))
    df = df.withColumn('Total_third_places',when(df["Total_third_places"].isNull(),0).\
                      otherwise(df["Total_third_places"]))
    df = df.select("Trainer_ID","Total_wins","Total_second_places","Total_third_places","Total_rides")
    df = df.select("Trainer_ID",(col("Total_wins")/col("Total_rides")).alias("trainer_first_place_ratio"),\
                                (col("Total_second_places")/col("Total_rides")).alias("trainer_second_place_ratio"),\
                                (col("Total_third_places")/col("Total_rides")).alias("trainer_third_place_ratio"),\
                                ((col("Total_wins") + col("Total_second_places") + col("Total_third_places"))/col("Total_rides")).alias("trainer_place_ratio"),\
                                ((col("Total_rides") - (col("Total_wins") + col("Total_second_places") + col("Total_third_places")))/col("Total_rides")).alias("trainer_lose_ratio"))
    return df

#jockey preprocessing
def jockey_preprocessing(df):
    df = df.withColumn('Total_wins',when(df["Total_wins"].isNull(),0).\
                      otherwise(df["Total_wins"]))
    df = df.withColumn('Total_second_places',when(df["Total_second_places"].isNull(),0).\
                      otherwise(df["Total_second_places"]))
    df = df.withColumn('Total_third_places',when(df["Total_third_places"].isNull(),0).\
                      otherwise(df["Total_third_places"]))
    df = df.select("Jockey_ID","Total_wins","Total_second_places","Total_third_places","Total_rides")
    df = df.select("Jockey_ID",(col("Total_wins")/col("Total_rides")).alias("jockey_first_place_ratio"),\
                                (col("Total_second_places")/col("Total_rides")).alias("jockey_second_place_ratio"),\
                                (col("Total_third_places")/col("Total_rides")).alias("jockey_third_place_ratio"),\
                                ((col("Total_wins") + col("Total_second_places") + col("Total_third_places"))/col("Total_rides")).alias("jockey_place_ratio"),\
                                ((col("Total_rides") - (col("Total_wins") + col("Total_second_places") + col("Total_third_places")))/col("Total_rides")).alias("jockey_lose_ratio"))
    return df

year_threshold = 2015
#race_preprocessing
def race_preprocessing(df):
    
    def return_year(x):
        return int(str(x)[:4])
    
    class_trans_dict = {
        'Hong Kong Group One': 'Group One',
        'Hong Kong Group Three': 'Group Three',
        'Group One': 'Group 1',
        'Class 4 (Special Condition)': 'Class 4',
        'Hong Kong Group Two': 'Group Two',
        'Class 4 (Restricted)': 'Class 4',
        'Class 3 (Special Condition)': 'Group 1',
        'Class 2 (Bonus Prize Money)': 'Class 2',
        'Class 3 (Bonus Prize Money)': 'Class 3',
        'Class 4 (Bonus Prize Money)': 'Class 4',
        '4 Year Olds ': '4 Year Olds',
        'Restricted Race': 'Griffin Race'}
    
    def map_race_class(x):
        if x in class_trans_dict.keys():
            return class_trans_dict[x]
        else:
            return x
    
    def fix_surface_type(x):
        if 'TURF' in str(x):
            return 'Turf'
        else:
            return 'All_Weather'
        
    returnyear_func = udf(return_year,IntegerType())
    map_race_class_func = udf(map_race_class,StringType())
    fix_surface_type_func = udf(fix_surface_type,StringType())
    
    df = df.withColumn("Year",returnyear_func(df["Date"]))
    df = df.select('*').where(f'Year > {year_threshold}')
    
    #getting distinct distance values
    distance_list = df.select("Distance").distinct().orderBy("Distance").rdd.map(lambda x:x.Distance).collect()
    
    #map class based on mapping dictionary
    df = df.withColumn("Class",map_race_class_func(df["Class"]))
    #Concatenate Class and Ranking
    df = df.select('*',concat_ws("_","Class","Ranking"))\
           .withColumnRenamed('concat_ws(_, Class, Ranking)','class_rank')
    #set Surface Type
    df = df.withColumn("Surface_Type",fix_surface_type_func(df["Surface"]))
    df = df.select('*',lit('Short').alias('Distance_Type'))
    df = df.withColumn("Distance_Type",when(((df["Course"] == 'Sha Tin') & (df["Surface_Type"] == "Turf")\
                  & (df["Distance"]  > 1400) & (df['Distance'] <= 1800)),"Medium").\
                       otherwise(df["Distance_Type"]))
    df = df.withColumn("Distance_Type",when(((df["Course"] == 'Sha Tin') & (df["Surface_Type"] == "Turf")\
              & (df['Distance'] > 1800)),"Long").\
                   otherwise(df["Distance_Type"]))
    df = df.withColumn("Distance_Type",when(((df["Course"] == 'Sha Tin') & (df["Surface_Type"] == "All_Weather")\
              & (df['Distance'] > 1300)),"Medium").\
                   otherwise(df["Distance_Type"]))    
    df = df.withColumn("Distance_Type",when(((df["Course"] == 'Happy Valley')\
                  & (df["Distance"]  > 1200) & (df['Distance'] <= 1800)),"Medium").\
                       otherwise(df["Distance_Type"]))
    df = df.withColumn("Distance_Type",when(((df["Course"] == 'Happy Valley')\
                  & (df["Distance"]  > 1800)),"Long").\
                       otherwise(df["Distance_Type"]))
    return df

#record preprocessing

def record_preprocessing(df,df_races,df_horses):
    def parse_placings(x):
        return int(x.split(" ")[0])
    
#     def parse_finish_time(x):
#         print(str(x))
#         x = str(x)
#         if ':' in x:
#             time = int(x.split(":")[0]) * 60 * 100 + int(x.split(":")[1].split(".")[0]) * 100 + int(x.split(":")[1].split(".")[1])
#         else :
# #             print(x)
#             time = int(x.split(".")[0]) * 100 + int(x.split(".")[1])
#         return time#     

    def parse_finish_time(x):
#         print(str(x))
        time = int(x[0]) * 60 * 100 + int(x[2:4]) * 100 + int(x[5:]) 
        time = time / 1000
        return time
    
    splitfunc = udf(parse_placings,IntegerType())
    convert_time = udf(parse_finish_time,FloatType())
    
    print('Start')
    df = df.select('*').where("Place != 'DISQ' AND Place != 'DNF' AND Place != 'FE' AND Place != 'PU' AND Place != 'TNP' AND Place != 'UR' AND Place != 'VOID' AND Place != 'WR' AND Place != 'WV' AND Place != 'WV-A' AND Place != 'WX' AND Place != 'WX-A' AND Place != 'WXNR' AND Place IS NOT NULL")
    df = df.withColumn("Place",splitfunc(df["Place"]))
    df = df.select('*').where('Place != 99 AND Place != 47')
    print('Dropped 99,47 and other Place values')
    
    df = df.select('*').where('Finish_time is not null').where('Place_Section_1 is not null')
    df = df.withColumn("Finish_time_mille_second", convert_time(df['Finish_time']))
    df.show(2)
    
    df = df.drop('Record_ID','Horse_Number','Horse_Code')
    df = df.withColumn('Win_odds',col('Win_odds').cast(FloatType()))
    df = df.join(df_races.select('Race_ID','Course','Prize','Date',"Distance_Type","Class","Ranking","Surface_Type"),'Race_ID')
    df = df.join(df_horses.select("Horse_ID","Age","State","Sex","Foal_Date"),"Horse_ID")
    spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
    df = df.withColumn('Date',to_date(col('Date'),"yyyy-mm-dd"))
    print('Cast Date')
    df = df.drop('Age')
    first_race_date_df = df.select('Horse_ID','Date').\
                    groupby('Horse_ID').agg(min('Date')).\
                    withColumnRenamed('min(Date)','First_Race_Date').\
                    orderBy('Horse_ID')
    df = df.join(first_race_date_df,"Horse_ID")
    
    #can be changed with average of foal date
    #use a table with foal date and age_at_first_race by using the first_race_date_df
    #floor(datediff(col("First_race_date_df"),col("Foal Date"))/365)
    #join the above with df
    #fill the missing values with the mean value for age_at_first_race
    df = df.select('*',(3 + floor(datediff(col("Date"),col("First_Race_Date"))/365)).\
                       alias('Age_At_Race'))
    
    print(df.show(2))
    #making Win Odds into a value between 0 and 1
    df_sum_win_odds_reciprocal = df.select("Race_ID","Horse_ID","Win_odds","Prize",(1/col("Win_odds")).alias('Reciprocal Win Odds'))\
                     .groupBy("Race_ID").sum("Reciprocal Win Odds")\
                     .withColumnRenamed('sum(Reciprocal Win Odds)','Sum Reciprocal')\
                     .orderBy("Race_ID")
    
    #As we have the sum of reciprocal of the Win Odds of each race, we can divide
    #the price money by this sum, to get the money available after HKJC takes 
    #it's commission
    print('Age at race done')
    df = df.withColumn("Prize",regexp_replace("Prize",",","").cast(IntegerType()))\
                     .select('Race_ID',"Horse_ID","Weight","Weight_Declared",\
                            "Win_odds","Draw","Place","Prize","Course","Surface_Type","Distance_Type","Class","Ranking","Date","State","Sex","First_Race_Date",\
                            "Age_At_Race","Jockey_ID","Trainer_ID", "Finish_time_mille_second")\
                     .join(df_sum_win_odds_reciprocal,"Race_ID")\
                     .select('Race_ID',"Horse_ID","Weight","Weight_Declared",\
                            "Win_odds","Draw","Place","Prize","Course","Surface_Type","Distance_Type","Class","Ranking","Date","State","Sex","First_Race_Date",\
                            "Age_At_Race","Jockey_ID","Trainer_ID",(col("Prize")/col("Sum Reciprocal")).alias("Available Prize Money"), "Finish_time_mille_second")\
                     .select('Race_ID',"Horse_ID","Weight","Weight_Declared",\
                            "Win_odds","Draw","Place","Prize","Course","Surface_Type","Distance_Type","Class","Ranking","Date","State","Sex","First_Race_Date",\
                            "Age_At_Race","Jockey_ID","Trainer_ID",((col("Available Prize Money")/col("Win_odds"))/col("Prize")).alias("Win_odds_%"),"Finish_time_mille_second")\
                     .orderBy("Race_ID")
    print('Win odds calculated')
    #drop weight_declared as it has too many missing values
    df = df.drop('Weight_Declared')
    #As Weight declared has only 12 pieces of data with '---', we drop these too
    df = df.select("*").where("Weight != '---'")
    
    #There are races with only 1 to 4 competitors. These will be dropped
    df_low_placings = None
    for i in range(1,5):
        if df_low_placings == None:
            df_low_placings = df.select("Race_ID","Place")\
                             .groupby("Race_ID")\
                             .agg(max("Place"))\
                             .withColumnRenamed("max(Place)","Place")\
                             .select("Race_ID","Place")\
                             .where(f"Place = {i}")
            df_low_placings.cache()
        else:
            df_low_placings = df_low_placings.union(df.select("Race_ID","Place")\
                             .groupby("Race_ID")\
                             .agg(max("Place"))\
                             .withColumnRenamed("max(Place)","Place")\
                             .select("Race_ID","Place")\
                             .where(f"Place = {i}"))
            df_low_placings.cache()
    #list of race ids with only 1 to 4 competitors
    race_id_list = df_low_placings.select("Race_ID").rdd.map(lambda x:x.Race_ID).collect()
    df = df.select('Race_ID',"Horse_ID","Weight","Age_At_Race",\
                            "Win_odds","Win_odds_%","Draw","Place","Prize","Surface_Type","Distance_Type","Class","Ranking","Course","Date","State","Sex","First_Race_Date","Jockey_ID","Trainer_ID","Finish_time_mille_second")\
                            .where(~col("Race_ID").isin(race_id_list))
    return df

#horse_preprocessing
def horse_preprocessing(df):
    df = df.select('Horse_ID', 'State', 'Country', 'Age', 'Color', 'Sex', 'Import_type', 'Total_Stakes', 'Last_Rating')
    return df

#sectional_preprocessing
def sectional_preprocessing(df):
    df = df.select('Race_ID', 'Horse_ID', 'Finish_time')
    return df

def foal_preprocessing(df_horse,df_foal):
    df = df_horse.join(df_foal.select('Horse_ID','Foal_Date'),'Horse_ID','left')
    return df

def calculate_win_percentage(partition):
    for horse in partition:
        horse_id = horse[0]
        win_count = 0
        total_count = 0
        win_percentage = list()
        for i in horse[1]:
            total_count += 1
            #i is a tuple having race_id,data,place
            if i[2] == 1:
                win_count += 1
            win_percentage.append((horse_id,i[0],i[1],(win_count/total_count) * 100))
            
            
        yield (horse_id,win_percentage)

def calculate_place_percentage(partition):
    for horse in partition:
        horse_id = horse[0]
        place_count = 0
        total_count = 0
        place_percentage = list()
        for i in horse[1]:
            total_count += 1
            #i is a tuple having race_id,data,place
            if (i[2] == 1) or (i[2] == 2) or (i[2] == 3):
                place_count += 1
            place_percentage.append((horse_id,i[0],i[1],(place_count/total_count) * 100))
            
        yield (horse_id,place_percentage)

#Example of Divide and Conquer being used
#pass in the records df to this function
def get_win_and_place_percentage_df(df,spark):
    sc = spark.sparkContext
    df_horse_place = df.select('Race_ID','Horse_ID','Date','Place').\
        groupby('Horse_ID','Race_ID','Date').agg(max(col('Place'))).\
        withColumnRenamed('max(Place)','Place').\
        orderBy('Horse_ID','Date')
    #difficult to apply pandas type operations on sparksql
    #requires pyarrow which isnt installing 
    #Turn to RDD and use Divide and conquer
    horse_place_rdd = df_horse_place.rdd
    horse_place_rdd = horse_place_rdd.map(lambda x: (x.Horse_ID,(x.Race_ID,x.Date,x.Place)))
    #Group by key to get all races that a horse has participated in
    #Key is horse ID
    #Make the values to a list format while maintaining the partitioning
    #that we get by groupByKey by using mapValues
    grouped_horse_id_rdd = horse_place_rdd.groupByKey().mapValues(list)
    #apply the mapPartitions method to do D&C
    win_percent = grouped_horse_id_rdd.mapPartitions(calculate_win_percentage)
    place_percent = grouped_horse_id_rdd.mapPartitions(calculate_place_percentage)
    #result is mapped to get only the values from the key,value pair
    #then we flatMap it to get to rdd format for dataframe
    win_percent_rdd = win_percent.map(lambda x: x[1]).flatMap(lambda x:x)
    place_percent_rdd = place_percent.map(lambda x:x[1]).flatMap(lambda x:x)
    schema_win_percent  = StructType([
    StructField("Horse_ID",IntegerType(),True),
    StructField("Race_ID",IntegerType(),True),
    StructField("Date",DateType(),True),
    StructField("Win_Perc",FloatType(),True)
    ])

    win_percent_dataframe = spark.createDataFrame(win_percent_rdd,schema_win_percent)

    schema_place_percent  = StructType([
    StructField("Horse_ID",IntegerType(),True),
    StructField("Race_ID",IntegerType(),True),
    StructField("Date",DateType(),True),
    StructField("Place_Perc",FloatType(),True)
    ])

    place_percent_dataframe = spark.createDataFrame(place_percent_rdd,schema_place_percent)

    return win_percent_dataframe,place_percent_dataframe

# Get wather data from MongoDB
# def get_weather_data(spark):
#     pipeline1 = "{'$project': {'day': 1,'month':1,'year':1,'sha_tin_max':1,'sha_tin_min':1,'_id':0}}"
#     pipeline2 = "{'$project': {'day': 1,'month':1,'year':1,'happy_velley_max':1,'happy_velley_min':1,'_id':0}}"
#     df1 = spark.read.format("mongo").option('pipeline',pipeline1).load()
#     df2 = spark.read.format("mongo").option('pipeline',pipeline2).load()
#     df1 = df1.select('*',lit('Sha Tin').alias('Course'))\
#              .withColumnRenamed('sha_tin_max','MaxTemp')\
#              .withColumnRenamed('sha_tin_min','MinTemp')
#     df2 = df2.select('*',lit('Happy Valley').alias('Course'))\
#              .withColumnRenamed('happy_velley_max','MaxTemp')\
#              .withColumnRenamed('happy_velley_min','MinTemp')
#     df1 = df1.select(concat_ws('/',df1.month.cast(IntegerType()),df1.day.cast(IntegerType()),df1.year.cast(IntegerType())).alias('Date'),'Course','MaxTemp','MinTemp')
#     df2 = df2.select(concat_ws('/',df2.month.cast(IntegerType()),df2.day.cast(IntegerType()),df2.year.cast(IntegerType())).alias('Date'),'Course','MaxTemp','MinTemp')
#     spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
#     df1 = df1.withColumn('Date',to_date(col('Date'),"M/dd/yyyy"))
#     df2 = df2.withColumn('Date',to_date(col('Date'),"M/dd/yyyy"))
#     return df1.union(df2)

# Get weather_data in 
def get_weather_data(spark):
#     pipeline1 = "{'$project': {'day': 1,'month':1,'year':1,'sha_tin_max':1,'sha_tin_min':1,'_id':0}}"
#     pipeline2 = "{'$project': {'day': 1,'month':1,'year':1,'happy_velley_max':1,'happy_velley_min':1,'_id':0}}"
#     df1 = spark.read.format("mongo").option('pipeline',pipeline1).load()
#     df2 = spark.read.format("mongo").option('pipeline',pipeline2).load()
    raw = spark.read.csv('weather_data.csv',header=True, inferSchema=True)

    
    df1 = raw.select('day','month','year','sha_tin_max','sha_tin_min','_c0')
    df2 = raw.select('day','month','year',"happy_velley_max",'happy_velley_min','_c0')
    
    
    df1 = df1.select('*',lit('Sha Tin').alias('Weather_Course'))\
             .withColumnRenamed('sha_tin_max','MaxTemp')\
             .withColumnRenamed('sha_tin_min','MinTemp')\
             .withColumnRenamed('_c0','_id') 
    
    df2 = df2.select('*',lit('Happy Valley').alias('Weather_Course'))\
             .withColumnRenamed('happy_velley_max','MaxTemp')\
             .withColumnRenamed('happy_velley_min','MinTemp')\
             .withColumnRenamed('_c0','_id')
    
    df1 = df1.select(concat_ws('/',df1.month.cast(IntegerType()),df1.day.cast(IntegerType()),df1.year.cast(IntegerType())).alias('Weather_Date'),'Weather_Course','MaxTemp','MinTemp')
    df2 = df2.select(concat_ws('/',df2.month.cast(IntegerType()),df2.day.cast(IntegerType()),df2.year.cast(IntegerType())).alias('Weather_Date'),'Weather_Course','MaxTemp','MinTemp')
    spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
    df1 = df1.withColumn('Weather_Date',to_date(col('Weather_Date'),"M/dd/yyyy"))
    df2 = df2.withColumn('Weather_Date',to_date(col('Weather_Date'),"M/dd/yyyy"))
    return df1.union(df2)




df_races, df_races_sectional, df_trainer, df_jockeys, df_records, df_horse, df_sectional, df_foal = read_data(spark)

df_trainer = trainer_preprocessing(df_trainer)
df_jockeys = jockey_preprocessing(df_jockeys)
df_races = race_preprocessing(df_races)
df_horse = horse_preprocessing(df_horse)
df_horse = foal_preprocessing(df_horse,df_foal)
#note that in records_preprocessing, races and horse are already joined into the dataframe
df_records = record_preprocessing(df_records,df_races,df_horse)

df_weather = get_weather_data(spark)
df_records = df_records.join(df_weather,(df_records["Date"] == df_weather["Weather_Date"]) & (df_records["Course"] == df_weather["Weather_Course"]))

df_records_jockey = df_records.join(df_jockeys,"Jockey_ID",'left')
df_records_jockey_trainer = df_records_jockey.join(df_trainer,"Trainer_ID",'left')
df_win_percent,df_place_percent = get_win_and_place_percentage_df(df_records,spark)
df_records_jockey_trainer_win = df_records_jockey_trainer.join(df_win_percent,['Horse_ID','Race_ID','Date'])
print('Win percentage and place percentage calculated')
df_records_jockey_trainer_win_place = df_records_jockey_trainer_win.join(df_place_percent,['Horse_ID','Race_ID','Date'])
print(df_records_jockey_trainer_win_place.count())
print('done')
# spark.stop()





df_dataset = df_records_jockey_trainer_win_place.select('Date','Weight','Age_At_Race','Win_odds','Win_odds_%', 'Draw',\
                                                       'Prize','Surface_Type','Distance_Type',"Class","Ranking",\
                                                       'Course','Sex','MaxTemp','MinTemp','jockey_first_place_ratio',\
                                                       'jockey_second_place_ratio','jockey_third_place_ratio',\
                                                       'jockey_place_ratio','trainer_first_place_ratio','trainer_second_place_ratio',
                                                       'trainer_third_place_ratio','trainer_place_ratio','Win_Perc','Place_Perc',\
                                                        'Finish_time_mille_second' )

# df_dataset = df_dataset.withColumn("Target",df_dataset.Weight.cast(FloatType()))
df_dataset = df_dataset.withColumn("Weight",df_dataset.Weight.cast(FloatType()))
df_dataset = df_dataset.withColumn("Draw",df_dataset.Weight.cast(FloatType()))
df_dataset = df_dataset.withColumnRenamed("Finish_time_mille_second", "Target")

df_dataset = df_dataset.select('*').where('Ranking is not null and Sex is not null and Course is not null and Class is not null and Distance_Type is not null and Surface_Type is not null')

features_cols =  ["Weight","Age_At_Race","Win_odds","Win_odds_%","Draw","Prize","Surface_Type_index","Distance_Type_index",
"Class_index","Ranking_index","Course_index","Sex_index","MaxTemp","MinTemp","jockey_first_place_ratio","jockey_second_place_ratio",
"jockey_third_place_ratio","jockey_place_ratio","trainer_first_place_ratio","trainer_second_place_ratio",
"trainer_third_place_ratio","trainer_place_ratio","Win_Perc","Place_Perc"]

string_cols = ["Surface_Type","Distance_Type","Class","Ranking","Course","Sex"]
string_cols_idx = ["Surface_Type_index","Distance_Type_index","Class_index","Ranking_index","Course_index","Sex_index"]
    
StringIdxer = StringIndexer(inputCols=string_cols, outputCols=string_cols_idx)

df_dataset2 = StringIdxer.fit(df_dataset).transform(df_dataset)
df_dataset2.cache()




Start
Dropped 99,47 and other Place values


Traceback (most recent call last):
  File "/home/gt/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/home/gt/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/home/gt/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 642, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/gt/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/home/gt/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/home/gt/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/home/gt/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 642, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/gt/spark-3.0.3-bin-hadoop2.7/

+---------+-------+-----+------------+--------+----------+-----------------+------+---------------+----+----------------+--------+----------+---------------+---------------+---------------+---------------+-----------+---------+--------+----------+--------+------------------------+
|Record_ID|Race_ID|Place|Horse_Number|Horse_ID|Horse_Code|       Horse_Name|Weight|Weight_Declared|Draw|Distance_Between|Win_odds|Place_odds|Place_Section_1|Place_Section_2|Place_Section_3|Place_Section_4|Finish_time|Jockey_ID|  Jockey|Trainer_ID| Trainer|Finish_time_mille_second|
+---------+-------+-----+------------+--------+----------+-----------------+------+---------------+----+----------------+--------+----------+---------------+---------------+---------------+---------------+-----------+---------+--------+----------+--------+------------------------+
|        2|      1|    1|           5|    1628|      B114|  SHANGHAI DRAGON|   130|           1186|   5|               -|     6.6|       2.4|             

                                                                                

+--------+-------+-----+----------------+------+---------------+----+----------------+--------+----------+---------------+---------------+---------------+---------------+-----------+---------+--------+----------+-------+------------------------+------------+---------+----------+-------------+-------+-------+------------+------+--------+---------+---------------+-----------+
|Horse_ID|Race_ID|Place|      Horse_Name|Weight|Weight_Declared|Draw|Distance_Between|Win_odds|Place_odds|Place_Section_1|Place_Section_2|Place_Section_3|Place_Section_4|Finish_time|Jockey_ID|  Jockey|Trainer_ID|Trainer|Finish_time_mille_second|      Course|    Prize|      Date|Distance_Type|  Class|Ranking|Surface_Type| State|     Sex|Foal_Date|First_Race_Date|Age_At_Race|
+--------+-------+-----+----------------+------+---------------+----+----------------+--------+----------+---------------+---------------+---------------+---------------+-----------+---------+--------+----------+-------+------------------------+-

                                                                                

Win percentage and place percentage calculated


                                                                                ]

55404
done


                                                                                ]

DataFrame[Date: date, Weight: float, Age_At_Race: bigint, Win_odds: float, Win_odds_%: double, Draw: float, Prize: int, Surface_Type: string, Distance_Type: string, Class: string, Ranking: string, Course: string, Sex: string, MaxTemp: double, MinTemp: double, jockey_first_place_ratio: double, jockey_second_place_ratio: double, jockey_third_place_ratio: double, jockey_place_ratio: double, trainer_first_place_ratio: double, trainer_second_place_ratio: double, trainer_third_place_ratio: double, trainer_place_ratio: double, Win_Perc: float, Place_Perc: float, Target: float, Sex_index: double, Class_index: double, Ranking_index: double, Distance_Type_index: double, Surface_Type_index: double, Course_index: double]

In [54]:
schema = df_dataset2.schema

In [58]:
# We now repartition the test data and break them down into 10 different files and write it to a csv file.
testData = df_dataset2.repartition(10)#Remove directory in case we rerun it multiple times.

# dbutils.fs.rm("/train_data/",True)#Create a directory

# testData.write.format("CSV").option("header",True).save("/train_data")

testData.repartition(10).write.csv("/home/gt/spark-3.0.3-bin-hadoop2.7/MSBD5003/train_data/temp1.csv")
testData.repartition(10).write.csv("/home/gt/spark-3.0.3-bin-hadoop2.7/MSBD5003/train_data/temp2.csv")
testData.repartition(10).write.csv("/home/gt/spark-3.0.3-bin-hadoop2.7/MSBD5003/train_data/temp3.csv")
testData.repartition(10).write.csv("/home/gt/spark-3.0.3-bin-hadoop2.7/MSBD5003/train_data/temp4.csv")

In [77]:
# ssc = StreamingContext(sc, 10)

df_dataset = spark.readStream.format("csv")\
                  .option("header",True)\
                    .schema(schema)\
                    .load("/home/gt/spark-3.0.3-bin-hadoop2.7/MSBD5003/train_data")

# df_dataset_schema = df_dataset.schema

# df_dataset_rdd = df_dataset.rdd

# df_dataset_rdd_Queue = df_dataset_rdd.randomSplit([1,1,1,1,1], 123)

# ssc = StreamingContext(sc, 60)

# df_lines = ssc.queueStream(df_dataset_rdd_Queue)
# df_lines = df_lines.map(lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16], x[17], x[18], x[19], x[20], x[21], x[22], x[23]))

# df_dataset = spark.createDataFrame(df_lines, schema = df_dataset_schema)
                                   


# VectorAssm = VectorAssembler(inputCols=features_cols, outputCol='features')
# df_vector_dataset = VectorAssm.transform(df_dataset2)

# scaler = MinMaxScaler(inputCol = "features" , outputCol="features_scaled")
# df_vector_dataset_scaled = scaler.fit(df_vector_dataset).transform(df_vector_dataset)

                                   
        
# # Split the data into training and test sets (30% held out for testing)
# (trainingData, testData) = df_vector_dataset_scaled.randomSplit([0.9, 0.1])

# # featureIndexer = VectorIndexer(inputCol = "features", outputCol="indexedFeatures", maxCategories=40).fit(df_vector_dataset)
# featureIndexer = VectorIndexer(inputCol = "features_scaled", outputCol="indexedFeatures").fit(df_vector_dataset_scaled)

# # Train a GBT model.
# gbt = GBTRegressor(featuresCol="indexedFeatures", labelCol="Target", maxIter=20)
# rf = RandomForestRegressor(featuresCol="indexedFeatures", labelCol="Target")

# # Chain indexer and GBT in a Pipeline
# pipeline = Pipeline(stages=[featureIndexer, gbt])
# pipeline_rf = Pipeline(stages=[featureIndexer, rf])

# # Train model.  This also runs the indexer.
# model = pipeline.fit(trainingData)

# # model_rf = pipeline_rf.fit(trainingData)

# # Make predictions.
# predictions = model.transform(testData)

# # predictions_rf = model_rf.transform(testData)

# # # Select example rows to display.
# # predictions.select("prediction", "Target", "features").show(5)


# # Select (prediction, true label) and compute test error
# evaluator = RegressionEvaluator(
#     labelCol="Target", predictionCol="prediction", metricName="rmse")

# rmse = evaluator.evaluate(predictions)
# print("GBT Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# # rmse = evaluator.evaluate(predictions_rf)
# # print("RF Root Mean Squared Error (RMSE) on test data = %g" % rmse)

def funct1(rdd):
    yield rdd.count()

# aggDF = df_dataset.count()


df_dataset \
    .writeStream \
    .format("console") \
    .start()

# query.awaitTermination()

# ssc.start()
# ssc.awaitTermination(25)
# ssc.stop(False)
# print("Finished")

Py4JJavaError: An error occurred while calling o9341.load.
: java.lang.ClassNotFoundException: Failed to find data source: *.csv. Please find packages at http://spark.apache.org/third-party-projects.html
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:689)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:195)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:243)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: *.csv.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:663)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:663)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:663)
	... 13 more


In [8]:
# from flask import Flask, request
# app = Flask(__name__)

# @app.route('/whateverYouWant', methods=['POST'])  #can set first param to '/'

# def toyFunction():
#     posted_data = sc.parallelize([request.get_data()])
#     return str(posted_data.collect()[0])

# if __name__ == '__main_':
#     app.run(port=7000)    #note set to 8080!
    
    
    

In [9]:
%%writefile run.py
from flask import Flask
from flask import Flask, render_template

app = Flask(__name__)

@app.route("/")
def prediction_results():
    return render_template('main.html',Date='2021-12-18', Pred_date='2021-12-18')


Overwriting run.py


In [10]:
import subprocess as sp

server = sp.Popen("FLASK_APP=run.py flask run", shell=True)
server


<subprocess.Popen at 0x7f2afd750a90>

 * Serving Flask app 'run.py' (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off


Traceback (most recent call last):
  File "/home/gt/.local/bin/flask", line 8, in <module>
    sys.exit(main())
  File "/home/gt/.local/lib/python3.8/site-packages/flask/cli.py", line 994, in main
    cli.main(args=sys.argv[1:])
  File "/home/gt/.local/lib/python3.8/site-packages/flask/cli.py", line 600, in main
    return super().main(*args, **kwargs)
  File "/home/gt/.local/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/home/gt/.local/lib/python3.8/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/gt/.local/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/gt/.local/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/home/gt/.local/lib/python3.8/site-packages/click/decorators.py", line 84, in new_func
    return ctx.invoke

In [17]:
import requests

requests.get(url="http://127.0.0.1:5000").content

b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n<title>500 Internal Server Error</title>\n<h1>Internal Server Error</h1>\n<p>The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.</p>\n'

In [15]:
server.terminate()

                                                                                

Win percentage and place percentage calculated




55404
done


                                                                                

In [163]:
df_records_jockey_trainer_win_place.rdd.take(1)

                                                                                ]

[Row(Horse_ID=41, Race_ID=26159, Date=datetime.date(2021, 1, 10), Trainer_ID=14, Jockey_ID=5, Weight='132', Age_At_Race=4, Win_odds=4.199999809265137, Win_odds_%=0.19485487860215805, Draw='6', Place=3, Prize=1000000, Surface_Type='Turf', Distance_Type='Medium', Class='Class 4', Ranking='60-40', Course='Happy Valley', State='Active', Sex=' Gelding', First_Race_Date=datetime.date(2020, 1, 1), Finish_time_mille_second=10.14799976348877, Weather_Date=datetime.date(2021, 1, 10), Weather_Course='Happy Valley', MaxTemp=16.4, MinTemp=11.5, jockey_first_place_ratio=0.21294117647058824, jockey_second_place_ratio=0.1468627450980392, jockey_third_place_ratio=0.1203921568627451, jockey_place_ratio=0.4801960784313726, jockey_lose_ratio=0.5198039215686274, trainer_first_place_ratio=0.06744157888364209, trainer_second_place_ratio=0.0739799007143722, trainer_third_place_ratio=0.07434314081607943, trainer_place_ratio=0.21576462041409372, trainer_lose_ratio=0.7842353795859063, Win_Perc=0.0, Place_Perc=21

In [1]:

####
# Save the df_dataset to mongo
####


In [166]:
df_dataset.select(max('Date')).show()



+----------+
| max(Date)|
+----------+
|2021-01-31|
+----------+





In [121]:
train_date = '1/1/2008'
test_date = '1/10/2021'

#########
# Get data from mongo
#########



In [122]:
df_dataset.show(1)



+----------+------+-----------+--------+-------------------+-----+-------+------------+-------------+-------+-------+------------+--------+-------+-------+------------------------+-------------------------+------------------------+------------------+-------------------------+--------------------------+-------------------------+-------------------+--------+----------+------+
|      Date|Weight|Age_At_Race|Win_odds|         Win_odds_%| Draw|  Prize|Surface_Type|Distance_Type|  Class|Ranking|      Course|     Sex|MaxTemp|MinTemp|jockey_first_place_ratio|jockey_second_place_ratio|jockey_third_place_ratio|jockey_place_ratio|trainer_first_place_ratio|trainer_second_place_ratio|trainer_third_place_ratio|trainer_place_ratio|Win_Perc|Place_Perc|Target|
+----------+------+-----------+--------+-------------------+-----+-------+------------+-------------+-------+-------+------------+--------+-------+-------+------------------------+-------------------------+------------------------+---------------



In [124]:
# df_dataset.select('class_rank').distinct().count()

In [125]:
df_dataset.select('Prize').distinct().count()

                                                                                ]]

62

In [126]:
df_dataset.select('Surface_Type').where('Surface_Type is null').show()

                                                                                ]

+------------+
|Surface_Type|
+------------+
+------------+



In [127]:
df_dataset.select('Distance_Type').where('Distance_Type is null').show()

+-------------+
|Distance_Type|
+-------------+
+-------------+



In [128]:
df_dataset.select('Class').where('Class is null').show()

                                                                                

+-----+
|Class|
+-----+
+-----+



In [129]:
df_dataset.select('Ranking').where('Ranking is null').count()

                                                                                ]

2028

In [130]:
df_dataset.select('Course').where('Course is null').show()

+------+
|Course|
+------+
+------+



In [131]:
df_dataset.select('Sex').where('Sex is null').show()

                                                                                ]

+---+
|Sex|
+---+
+---+



NameError: name 'df_dataset' is not defined

In [133]:
df_dataset.count()

                                                                                ]]

53376

In [134]:
all_cols = df_dataset.schema.names

In [135]:
# df_dataset = df_dataset.withColumn()

In [136]:
# scaler1 = MinMaxScaler(inputCol = "Target" , outputCol="Target_scaled")


In [137]:
# df_dataset_sacled = scaler1.fit(df_dataset).transform(df_dataset)

In [138]:
# df_dataset_sacled = scaler.fit(df_vector_dataset).transform(df_vector_dataset)

In [139]:
# df_dataset_sacled.show(1)

In [140]:
features_cols =  ["Weight","Age_At_Race","Win_odds","Win_odds_%","Draw","Prize","Surface_Type_index","Distance_Type_index",
"Class_index","Ranking_index","Course_index","Sex_index","MaxTemp","MinTemp","jockey_first_place_ratio","jockey_second_place_ratio",
"jockey_third_place_ratio","jockey_place_ratio","trainer_first_place_ratio","trainer_second_place_ratio",
"trainer_third_place_ratio","trainer_place_ratio","Win_Perc","Place_Perc"]

string_cols = ["Surface_Type","Distance_Type","Class","Ranking","Course","Sex"]
string_cols_idx = ["Surface_Type_index","Distance_Type_index","Class_index","Ranking_index","Course_index","Sex_index"]
    


In [3]:
# df_dataset2.show(1)

In [149]:


# featureIndexer = VectorIndexer(inputCol = "features", outputCol="indexedFeatures", maxCategories=40).fit(df_vector_dataset)
featureIndexer = VectorIndexer(inputCol = "features_scaled", outputCol="indexedFeatures").fit(df_vector_dataset_scaled)

# Train a GBT model.


# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# # Select example rows to display.
# predictions.select("prediction", "Target", "features").show(5)



                                                                                ]

In [150]:
predictions.show(1)



+----------+------+-----------+--------+------------------+-----+------+------------+-------------+-------+-------+-------+--------+-------+-------+------------------------+-------------------------+------------------------+------------------+-------------------------+--------------------------+-------------------------+-------------------+--------+----------+------+---------+-----------+-------------+-------------------+------------------+------------+--------------------+--------------------+--------------------+-----------------+
|      Date|Weight|Age_At_Race|Win_odds|        Win_odds_%| Draw| Prize|Surface_Type|Distance_Type|  Class|Ranking| Course|     Sex|MaxTemp|MinTemp|jockey_first_place_ratio|jockey_second_place_ratio|jockey_third_place_ratio|jockey_place_ratio|trainer_first_place_ratio|trainer_second_place_ratio|trainer_third_place_ratio|trainer_place_ratio|Win_Perc|Place_Perc|Target|Sex_index|Class_index|Ranking_index|Distance_Type_index|Surface_Type_index|Course_index|    





Root Mean Squared Error (RMSE) on test data = 0.655814




In [152]:
gbtModel = model.stages[1]
print(gbtModel)  # summary only

RandomForestRegressionModel: uid=RandomForestRegressor_c50d8df844f9, numTrees=20, numFeatures=24


In [153]:
prediction.select('features')

NameError: name 'prediction' is not defined

In [157]:
df_vector_dataset_scaled.select(max('Date')).show()



+----------+
| max(Date)|
+----------+
|2021-01-31|
+----------+



