In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f


spark = SparkSession.builder.appName('SparkByExamples.com')\
    .config('spark.driver.bindAddress','localhost')\
    .config("spark.ui.port","4051")\
    .config("spark.driver.memory","5g")\
    .getOrCreate()


23/02/13 08:18:42 WARN Utils: Your hostname, Chaturvedi_PC resolves to a loopback address: 127.0.1.1; using 172.27.71.32 instead (on interface eth0)
23/02/13 08:18:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/02/13 08:18:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
train_input_path = '/home/abhay/work/dream11/processed_output/training_rows'

In [3]:
train_df = spark.read.parquet(train_input_path)

23/02/13 08:18:53 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.


In [4]:
limit_df = train_df.limit(100)

In [19]:
from typing import Optional
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

def __extract_batting_fantasy_points(
        batter_runs: Optional[int], dismissals: Optional[int],
        balls_faced: Optional[int], boundaries_count: Optional[int], sixes_count: Optional[int] ):
    fantasy_points = 0
    is_out: bool = dismissals is not None and dismissals > 0
    is_duck = batter_runs is not None and batter_runs==0 and is_out
    if batter_runs is None:
        batter_runs = 0
    if balls_faced is None:
        balls_faced = 0
    
    strike_rate = batter_runs*100.0/balls_faced if balls_faced > 0 else 100
    fantasy_points = batter_runs * 1 
    if boundaries_count is not None:
        fantasy_points += boundaries_count * 1
    if sixes_count is not None:
        fantasy_points += sixes_count * 2
    # handle half century
    if batter_runs >=50 and batter_runs < 100:
        fantasy_points += 8
    # handle century
    elif batter_runs>=100:
        fantasy_points += 16
    # strike rate penalty
    if strike_rate >=60 and strike_rate<=70:
        fantasy_points -= 2
    elif strike_rate >=50 and strike_rate<60:
        fantasy_points -= 4
    elif strike_rate <50:
        fantasy_points -= 6
    # duck penalty
    if is_duck:
        fantasy_points -= 2
    return fantasy_points

def __extract_bowling_fantasy_points(total_runs: Optional[int], wickets: Optional[int], deliveries: Optional[int], maidens: Optional[int]):
    fantasy_points = 0
    if total_runs is None:
        total_runs = 0
    if deliveries is None:
        deliveries = 0
    if wickets is None:
        wickets = 0
    if maidens is None:
        maidens = 0
    economy_rate = total_runs*6.0/deliveries if deliveries > 0 else 6
    fantasy_points = wickets * 25 + maidens * 8
    # handle 4 wickets
    if wickets >=4 and wickets < 5:
        fantasy_points += 8
    # handle 5 wickets
    elif wickets>5:
        fantasy_points += 16
    # handle economy bonus
    if economy_rate >= 5 and economy_rate < 6:
        fantasy_points += 2
    elif economy_rate < 5:
        fantasy_points += 4
    return fantasy_points

def __extract_fielding_fantasy_points(fielding_wickets: Optional[int]):
    fantasy_points = 0
    fantasy_points += fielding_wickets * 7 if fielding_wickets is not None else 0 # avergae to 7 to account for various dismisaals mechanisms
    return fantasy_points

def get_fantasy_points(
        batter_runs: Optional[int], dismissals: Optional[int], balls_faced: Optional[int], boundaries_count: Optional[int], 
        sixes_count: Optional[int],
        total_runs: Optional[int], wickets: Optional[int], deliveries: Optional[int], maidens: Optional[int],
        fielding_wickets: Optional[int]
    ):
    return __extract_batting_fantasy_points(batter_runs, dismissals, balls_faced, boundaries_count, sixes_count) \
        + __extract_bowling_fantasy_points(total_runs, wickets, deliveries, maidens) \
        + __extract_fielding_fantasy_points(fielding_wickets) \
        + 4 # 4 points for being selected
    
get_fantasy_points_udf = udf(get_fantasy_points, LongType())

In [21]:
test_df = limit_df.withColumn("some_Col", get_fantasy_points_udf(
            limit_df.batter_run_sum, limit_df.dismissals, limit_df.balls_faced,
            limit_df.boundary_count, limit_df.six_count,
            limit_df.total_run_sum, limit_df.wicket_sum, limit_df.deliveries, limit_df.maiden_count,
            limit_df.fielding_wicket_sum
        ))

In [20]:
test_df[['batter_run_sum', 'dismissals', 'balls_faced',
            'boundary_count', 'six_count',
            'total_run_sum', 'wicket_sum', 'deliveries', 'maiden_count',
            'fielding_wicket_sum', "some_Col"]]

Unnamed: 0,batter_run_sum,dismissals,balls_faced,boundary_count,six_count,total_run_sum,wicket_sum,deliveries,maiden_count,fielding_wicket_sum,some_Col
0,101.0,1.0,55.0,5.0,8.0,28.0,3.0,25.0,0.0,1.0,
1,4.0,1.0,7.0,1.0,0.0,,,,,,
2,,,,,,26.0,1.0,24.0,0.0,1.0,
3,10.0,1.0,18.0,0.0,0.0,25.0,3.0,24.0,0.0,,
4,22.0,0.0,14.0,4.0,0.0,16.0,0.0,16.0,0.0,,
...,...,...,...,...,...,...,...,...,...,...,...
95,13.0,1.0,13.0,2.0,0.0,,,,,1.0,
96,0.0,1.0,1.0,0.0,0.0,24.0,1.0,24.0,0.0,1.0,
97,3.0,0.0,3.0,0.0,0.0,11.0,0.0,12.0,0.0,,
98,6.0,1.0,7.0,0.0,0.0,,,,,,


In [23]:
test_df.select(['batter_run_sum', 'dismissals', 'balls_faced',
            'boundary_count', 'six_count',
            'total_run_sum', 'wicket_sum', 'deliveries', 'maiden_count',
            'fielding_wicket_sum', "fantasy_points","some_Col"]).show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------------+----------+-----------+--------------+---------+-------------+----------+----------+------------+-------------------+--------------+--------+
|batter_run_sum|dismissals|balls_faced|boundary_count|six_count|total_run_sum|wicket_sum|deliveries|maiden_count|fielding_wicket_sum|fantasy_points|some_Col|
+--------------+----------+-----------+--------------+---------+-------------+----------+----------+------------+-------------------+--------------+--------+
|           101|         1|         55|             5|        8|           28|         3|        25|           0|                  1|          null|     224|
|             4|         1|          7|             1|        0|         null|      null|      null|        null|               null|          null|       5|
|          null|      null|       null|          null|     null|           26|         1|        24|           0|                  1|          null|      36|
|            10|         1|         18|             

23/02/13 08:44:17 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-8117d7b3-f9f2-4991-8937-28a164c0580e. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-8117d7b3-f9f2-4991-8937-28a164c0580e
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:171)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:110)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:91)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1141)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:182)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:178)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 58038)
Traceback (most recent call last):
  File "/home/abhay/work/miniconda3/envs/dream11/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/abhay/work/miniconda3/envs/dream11/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/home/abhay/work/miniconda3/envs/dream11/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/abhay/work/miniconda3/envs/dream11/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/abhay/work/miniconda3/envs/dream11/lib/python3.9/site-packages/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/home/abhay/work/miniconda3/envs/dream11/lib/python3.9/site-packages/pyspark/a