In [2]:
import pyspark
from pyspark.sql import DataFrame
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *


In [3]:

def extract_bets(spark, debug = 0):
    df_bets = spark.read.parquet("../Case_1/bets_v1/part-00000-4ea82782-6afd-434c-8fee-42264167ffe1-c000.snappy.parquet")
    # Cast some fields
    #df_bets = df_bets.withColumn("legs.price.decimal", transform(df_bets["legs.price.decimal"], lambda x, i: (df_bets["legs.price.decimal"][i]).cast('float')))
    df_bets = df_bets.withColumn("account_id", col("account_id").cast('int'))
    df_bets = df_bets.withColumn("legs", transform(col("legs"), lambda x, i: x.price.withField("decimal", x.price.decimal.cast('float'))))
    
    if debug == 1:
        df_bets.show(truncate=0)
        df_bets.printSchema()
    
    return df_bets

In [4]:

def extract_trans(spark, debug = 0):
    df_trans = spark.read.parquet("../Case_1/trans_v1/part-00000-6d83da89-6ef2-4edc-8446-7838dce4bd1d-c000.snappy.parquet")    

    if debug == 1:
        df_trans.show(truncate=0)
        df_trans.printSchema()
    
    return df_trans

In [5]:
spark = SparkSession.builder.appName("case").getOrCreate()
extract_bets(spark, 1)

+------------------------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

DataFrame[sportsbook_id: string, account_id: int, legs: array<struct<num:int,den:int,decimal:float,americanOdds:string>>, markets: array<struct<outcomeRef:string,marketRef:string,eventRef:string,categoryRef:string,outcomeName:string,eventTypeName:string,className:string,marketName:string,eventName:string,eventStartTime:string>>]

In [6]:
extract_trans(spark, 1)

+------------------------------------+------------------------------------+---------+-----------------+---------+
|sportsbook_id                       |trans_uuid                          |transType|config           |deltaCash|
+------------------------------------+------------------------------------+---------+-----------------+---------+
|dcac73b6-18e4-4c9a-8900-9935e0e6b2fa|07862be1-5c33-75ab-4b41-43d73e982405|ADJRESULT|Unsettlement_Lose|0.0      |
|66ce7381-8a05-4aa3-aae5-c6bc65295a0c|125a6cc9-7de5-9cf8-15c2-e6cbfce633fd|RESULT   |Settlement_Lose  |0.0      |
|abda5fcf-9d5d-4534-954e-aacc42432725|23cb5474-2337-ce1d-2400-53077e7909e1|RESULT   |Settlement_Lose  |0.0      |
|dcac73b6-18e4-4c9a-8900-9935e0e6b2fa|65d2188b-a3ab-41e7-670d-c1bb89d673f6|WAGER    |SBOB_WAGER       |-0.34    |
|dcac73b6-18e4-4c9a-8900-9935e0e6b2fa|68f7914a-17bb-9b5d-d677-c204ce045548|ADJRESULT|Resettlement_Win |0.7      |
|66ce7381-8a05-4aa3-aae5-c6bc65295a0c|866205be-b9da-4887-49af-c081696505f3|WAGER    |SBO

DataFrame[sportsbook_id: string, trans_uuid: string, transType: string, config: string, deltaCash: double]

In sportsbetting it's possible to bet on a combination of events. Information on these events can be found in table `bets_v1`. However per bet the source provides data in two separate arrays. The odds are received in a column called 'legs' and the type of bet and the name of the sports-event can be found in the 'markets' column. Your goal is to read `bets_v1` and add a column 'outcomes' which combines these two arrays, such that it becomes clear what are the odds for a specific market (i.e. a single list of outcomes in which all the information of an outcome is available).

In [7]:
bets_bronze = extract_bets(spark)

def add_outcomes_to_bets_bronze(df):
    df = df.withColumn("outcomes", transform(
    arrays_zip(col("legs"), col("markets")), lambda x: x
    ))
    return df

bets_bronze = add_outcomes_to_bets_bronze(bets_bronze)
bets_bronze.show()

+--------------------+----------+--------------------+--------------------+--------------------+
|       sportsbook_id|account_id|                legs|             markets|            outcomes|
+--------------------+----------+--------------------+--------------------+--------------------+
|4e781821-9bd5-43d...| 112164134|[{7, 4, 2.75, +17...|[{35382838, 10203...|[{{7, 4, 2.75, +1...|
|66ce7381-8a05-4aa...| 112164134|[{7, 4, 2.75, +17...|[{35382838, 10203...|[{{7, 4, 2.75, +1...|
|abda5fcf-9d5d-453...| 112164134|[{7, 4, 2.75, +17...|[{35508011, 10307...|[{{7, 4, 2.75, +1...|
|dcac73b6-18e4-4c9...| 114856230|[{37, 50, 1.74, -...|[{35644601, 10344...|[{{37, 50, 1.74, ...|
+--------------------+----------+--------------------+--------------------+--------------------+



In [8]:

trans_bronze = extract_trans(spark)

def trans_transactions(df):
    df = df.groupBy(col("sportsbook_id")).agg(collect_list(struct( [col(column) for column in trans_bronze.columns if column != "sportsbook_id"] )).alias("transactions"))
    return df

trans_bronze = trans_transactions(trans_bronze)
trans_bronze.show()

+--------------------+--------------------+
|       sportsbook_id|        transactions|
+--------------------+--------------------+
|abda5fcf-9d5d-453...|[{23cb5474-2337-c...|
|dcac73b6-18e4-4c9...|[{07862be1-5c33-7...|
|66ce7381-8a05-4aa...|[{125a6cc9-7de5-9...|
|4e781821-9bd5-43d...|[{c62c05b8-a0df-f...|
+--------------------+--------------------+



In [9]:
bets_silver = bets_bronze.join(trans_bronze, bets_bronze.sportsbook_id == trans_bronze.sportsbook_id, "left").drop(trans_bronze.sportsbook_id)
bets_silver.show(truncate=0)

+------------------------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:
bet_gold = bets_silver.select(["sportsbook_id", "account_id", "outcomes", "transactions"])
bet_gold.show()

+--------------------+----------+--------------------+--------------------+
|       sportsbook_id|account_id|            outcomes|        transactions|
+--------------------+----------+--------------------+--------------------+
|4e781821-9bd5-43d...| 112164134|[{{7, 4, 2.75, +1...|[{c62c05b8-a0df-f...|
|66ce7381-8a05-4aa...| 112164134|[{{7, 4, 2.75, +1...|[{125a6cc9-7de5-9...|
|abda5fcf-9d5d-453...| 112164134|[{{7, 4, 2.75, +1...|[{23cb5474-2337-c...|
|dcac73b6-18e4-4c9...| 114856230|[{{37, 50, 1.74, ...|[{07862be1-5c33-7...|
+--------------------+----------+--------------------+--------------------+



In [1]:
import pyarrow.parquet as pq
pq.read_table('./Case_1/bets_interview_completed.parquet')

FileNotFoundError: ./Case_1/bets_interview_completed.parquet