In [5]:
from delta import *
from currency_converter import CurrencyConverter, ECB_URL

import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.memory", "10g") \
    .config("spark.driver.maxResultSize", "10g")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [6]:
bets_df = spark.read.format("delta").load("/Users/andreikirpichev/Downloads/bet_stats/")
fixtures_df = spark.read.format("delta").load("/Users/andreikirpichev/Downloads/fixtures/")
customers_df = spark.read.format("delta").load("/Users/andreikirpichev/Downloads/customers/")

recent_dates = ['2022-10-02', '2022-10-03', '2022-10-04', '2022-10-05', '2022-10-06', '2022-10-07', '2022-10-08', '2022-10-09', '2022-10-10', '2022-10-11', '2022-10-12', '2022-10-13', '2022-10-14', '2022-10-15', '2022-10-16', '2022-10-17', '2022-10-18', '2022-10-19', '2022-10-20', '2022-10-21', '2022-10-22', '2022-10-23', '2022-10-24', '2022-10-25', '2022-10-26', '2022-10-27', '2022-10-28', '2022-10-29', '2022-10-30', '2022-10-01', '2022-09-02', '2022-09-03', '2022-09-04', '2022-09-05', '2022-09-06', '2022-09-07', '2022-09-08', '2022-09-09', '2022-09-10', '2022-09-11', '2022-09-12', '2022-09-13', '2022-09-14', '2022-09-15', '2022-09-16', '2022-09-17', '2022-09-18', '2022-09-19', '2022-09-20', '2022-09-21', '2022-09-22', '2022-09-23', '2022-09-24', '2022-09-25', '2022-09-26', '2022-09-27', '2022-09-28', '2022-09-29', '2022-09-30', '2022-09-01', '2022-09-31']

bets_df = bets_df.filter(col("placement_date").isin(recent_dates))\
                 .withColumn('contest_id', split('contest_id', '\|').getItem(1))

bf = bets_df.join(fixtures_df, "contest_id", "left")
data = bf.join(customers_df, bf.customer_id == customers_df.id, "left")

columns = ['customer_id', 'age', 'country', 'section', 'point', 'market', 'market_params', 'contest_id', 'placement_date', 'placement_time', 'placement_odds', 'home', 'away', 'region', 'sport', 'league', 'minutes_before_start', 'stake_eur']
# 'bet_no', 'pick_index', 'fees', 'outcomes', 'currency', 'stake', 'placement_time', 'betting_types', 'scheduled_begin', 'birthdate'
c = CurrencyConverter(ECB_URL, fallback_on_missing_rate=True)
convert_curr = udf(lambda v,f,d : c.convert(v, f, 'EUR', d), DoubleType())

weekday_until_now = Window.partitionBy("customer_id", "dayofweek").rowsBetween(Window.unboundedPreceding, Window.currentRow)
section_last2weeks = Window.partitionBy("customer_id", "section").orderBy(expr("unix_date(placement_date)").asc()).rangeBetween(-14, 0)
point_last2weeks = Window.partitionBy("customer_id", "point").orderBy(expr("unix_date(placement_date)").asc()).rangeBetween(-14, 0)
market_last2weeks = Window.partitionBy("customer_id", "market").orderBy(expr("unix_date(placement_date)").asc()).rangeBetween(-14, 0)

data.withColumn('age', ceil(datediff('placement_date', 'birthdate', )/365.25))\
    .withColumn('minutes_before_start',
                  when(
                      round((unix_timestamp('scheduled_begin') - unix_timestamp('placement_time')) / 60) < 0,
                      0)
                  .otherwise(round((unix_timestamp('scheduled_begin') - unix_timestamp('placement_time')) / 60)))\
    .withColumn('section', split(bets_df['betting_types'].getItem(0), ' ').getItem(0))\
    .withColumn('point', split(bets_df['betting_types'].getItem(0), ' ').getItem(1))\
    .withColumn('market', split(bets_df['betting_types'].getItem(0), ' ').getItem(2))\
    .withColumn('market_params', regexp_extract(bets_df['betting_types'].getItem(1), '\(.*?\)', 0))\
    .filter(data['currency'] != 'NEO')\
    .withColumn('stake_eur', round(convert_curr('stake', 'currency', 'placement_date')/100, 2))\
    .select(*columns)\
    .withColumn("dayofweek", dayofweek(col("placement_date")))\
    .withColumn("weekday_stake", round(avg(col("stake_eur")).over(weekday_until_now), 2))\
    .withColumn("section_2w", round(avg(col("stake_eur")).over(section_last2weeks), 2))\
    .withColumn("point_2w", round(avg(col("stake_eur")).over(point_last2weeks), 2))\
    .withColumn("market_2w", round(avg(col("stake_eur")).over(market_last2weeks), 2))\
    .filter(col("age").isNotNull() & col("country").isNotNull())\
    .orderBy(col("placement_time").asc())\
    .write.format("delta").mode("overwrite").save("/Users/andreikirpichev/Downloads/bets")


                                                                                

In [7]:
bets_df = spark.read.format("delta").load("/Users/andreikirpichev/Downloads/bets/")

bets_df.select(countDistinct("market_params")).show()
bets_df.select([count(when(col(c).isNull(), c)).alias(c) for c in bets_df.columns]).show()

+-----------------------------+
|count(DISTINCT market_params)|
+-----------------------------+
|                         2320|
+-----------------------------+





+-----------+---+-------+-------+-----+------+-------------+----------+--------------+--------------+--------------+----+----+------+-----+------+--------------------+---------+---------+-------------+----------+--------+---------+
|customer_id|age|country|section|point|market|market_params|contest_id|placement_date|placement_time|placement_odds|home|away|region|sport|league|minutes_before_start|stake_eur|dayofweek|weekday_stake|section_2w|point_2w|market_2w|
+-----------+---+-------+-------+-----+------+-------------+----------+--------------+--------------+--------------+----+----+------+-----+------+--------------------+---------+---------+-------------+----------+--------+---------+
|          0|  0|      0|      0|    0|     0|      4684742|         0|             0|             0|             0|   0|   0|     0|    0|     0|                   0|        0|        0|            0|         0|       0|        0|
+-----------+---+-------+-------+-----+------+-------------+----------+-

                                                                                