### Create spark session with 2 cores and 2g memory on a driver (you will work locally)

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("LAB3") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

### Using Spark UI link below you can see tasks submitted to spark. Also you can see cached values

In [3]:
spark

In [4]:
import random

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, udf, monotonically_increasing_id, mean

### Read dataset

In [5]:
path = os.path.join("data", "cards.json")

In [6]:
data = spark.read.json(path)

In [7]:
data.show(1, vertical=True)

-RECORD 0----------------------------------
 artist             | Zoltan & Gabor       
 attack             | null                 
 cardClass          | SHAMAN               
 classes            | null                 
 collectible        | true                 
 collectionText     | null                 
 cost               | 2                    
 dbfId              | 404                  
 durability         | null                 
 elite              | null                 
 entourage          | null                 
 faction            | null                 
 flavor             | It was just a fle... 
 health             | null                 
 hideStats          | null                 
 howToEarn          | null                 
 howToEarnGolden    | null                 
 id                 | CS2_038              
 mechanics          | null                 
 multiClassGroup    | null                 
 name               | Ancestral Spirit     
 overload           | null      

### Remove samples without artist name, attack and health value

In [8]:
data_authors = (
    data
    .filter(col('artist').isNotNull())
    .filter(col('attack').isNotNull())
    .filter(col('health').isNotNull())
)

In [9]:
data_authors.cache()

DataFrame[artist: string, attack: bigint, cardClass: string, classes: array<string>, collectible: boolean, collectionText: string, cost: bigint, dbfId: bigint, durability: bigint, elite: boolean, entourage: array<string>, faction: string, flavor: string, health: bigint, hideStats: boolean, howToEarn: string, howToEarnGolden: string, id: string, mechanics: array<string>, multiClassGroup: string, name: string, overload: bigint, playRequirements: struct<REQ_DAMAGED_TARGET:bigint,REQ_ENEMY_TARGET:bigint,REQ_ENEMY_WEAPON_EQUIPPED:bigint,REQ_ENTIRE_ENTOURAGE_NOT_IN_PLAY:bigint,REQ_FRIENDLY_MINION_DIED_THIS_GAME:bigint,REQ_FRIENDLY_TARGET:bigint,REQ_FROZEN_TARGET:bigint,REQ_HERO_TARGET:bigint,REQ_LEGENDARY_TARGET:bigint,REQ_MINIMUM_ENEMY_MINIONS:bigint,REQ_MINIMUM_TOTAL_MINIONS:bigint,REQ_MINION_OR_ENEMY_HERO:bigint,REQ_MINION_SLOT_OR_MANA_CRYSTAL_SLOT:bigint,REQ_MINION_TARGET:bigint,REQ_MUST_TARGET_TAUNTER:bigint,REQ_NONSELF_TARGET:bigint,REQ_NUM_MINION_SLOTS:bigint,REQ_SECRET_CAP_FOR_NON_SE

### For each column print the number of non Null rows and number of unique rows

In [10]:
for column_name in data_authors.columns:
    rows_number_not_empty = data_authors.filter(col(column_name).isNotNull()).count()
    rows_number_unique = data_authors.select(column_name).distinct().count() # distinct operation may help
    
    print('{}. Non empty: {}\tUnique values: {}'.format(column_name, rows_number_not_empty, rows_number_unique))

artist. Non empty: 829	Unique values: 234
attack. Non empty: 829	Unique values: 31
cardClass. Non empty: 829	Unique values: 10
classes. Non empty: 9	Unique values: 4
collectible. Non empty: 731	Unique values: 2
collectionText. Non empty: 5	Unique values: 6
cost. Non empty: 829	Unique values: 14
dbfId. Non empty: 829	Unique values: 829
durability. Non empty: 0	Unique values: 1
elite. Non empty: 141	Unique values: 2
entourage. Non empty: 6	Unique values: 7
faction. Non empty: 60	Unique values: 3
flavor. Non empty: 731	Unique values: 732
health. Non empty: 829	Unique values: 30
hideStats. Non empty: 2	Unique values: 2
howToEarn. Non empty: 136	Unique values: 27
howToEarnGolden. Non empty: 181	Unique values: 77
id. Non empty: 829	Unique values: 829
mechanics. Non empty: 553	Unique values: 49
multiClassGroup. Non empty: 9	Unique values: 4
name. Non empty: 829	Unique values: 791
overload. Non empty: 9	Unique values: 4
playRequirements. Non empty: 81	Unique values: 34
playerClass. Non empty: 

In [11]:
data_authors.select('race').distinct().show()

+----------+
|      race|
+----------+
|MECHANICAL|
|      null|
|    MURLOC|
|    DRAGON|
|     TOTEM|
|     BEAST|
|     DEMON|
|    PIRATE|
+----------+



In [12]:
data_authors.select('rarity').distinct().show()

+---------+
|   rarity|
+---------+
|     null|
|     FREE|
|     EPIC|
|     RARE|
|   COMMON|
|LEGENDARY|
+---------+



In [13]:
str2int_rarity = {
    'FREE': 1,
    'COMMON': 2,
    'RARE': 3,
    'LEGENDARY': 4,
    'EPIC': 5,
    'NULL': 0
}

In [14]:
# Using dict above - create a new column with name "rarity_int", where each string will be converted to an integer
# Operation "col" return "pointer" to required column

In [15]:
data_authors = (
    data_authors
    .withColumn(
        'rarity_int',
        udf(lambda x: str2int_rarity.get(x, 0), IntegerType())(col('rarity'))
    )
)

In [16]:
data_authors.show(1, vertical=True)

-RECORD 0----------------------------------
 artist             | John Polidora        
 attack             | 10                   
 cardClass          | NEUTRAL              
 classes            | null                 
 collectible        | true                 
 collectionText     | null                 
 cost               | 9                    
 dbfId              | 2725                 
 durability         | null                 
 elite              | true                 
 entourage          | null                 
 faction            | null                 
 flavor             | This massive yeti... 
 health             | 10                   
 hideStats          | null                 
 howToEarn          | null                 
 howToEarnGolden    | null                 
 id                 | AT_125               
 mechanics          | [CHARGE]             
 multiClassGroup    | null                 
 name               | Icehowl              
 overload           | null      

### Calculate average rarity of each race

In [17]:
data_authors.groupby('race').agg(mean('rarity_int').alias('average rarity')).show()

+----------+------------------+
|      race|    average rarity|
+----------+------------------+
|MECHANICAL|3.0185185185185186|
|      null|2.6536502546689302|
|    MURLOC| 2.652173913043478|
|    DRAGON|3.2222222222222223|
|     TOTEM|               2.5|
|     BEAST|               2.3|
|     DEMON|2.3333333333333335|
|    PIRATE|2.8947368421052633|
+----------+------------------+



In [18]:
@udf(returnType=IntegerType())
def calculate_number_of_attacks(health, attack):
    if attack == 0:
        return -1
    return health // attack + (0 if health % attack == 0 else 1)

In [19]:
# Using udf defined above, calculate how many attacks needed for each card to kill itself
data_authors_2 = (
    data_authors
    .withColumn('Attack to kill itself', calculate_number_of_attacks(col('health'), col('attack')))
    .select('health', 'attack', 'Attack to kill itself')
)

In [20]:
data_authors_2.show(5)

+------+------+---------------------+
|health|attack|Attack to kill itself|
+------+------+---------------------+
|    10|    10|                    1|
|     5|     3|                    2|
|     3|     2|                    2|
|     2|     2|                    1|
|     4|     5|                    1|
+------+------+---------------------+
only showing top 5 rows



In [21]:
# Here some work on randomizing indexes for dataframe. It is not always easy to assign unique values for each row non-monotonically

In [22]:
random_ids_left = list(range(data_authors.count() + 1))
random_ids_right = list(range(data_authors.count() + 1))

In [23]:
random_ids_left[:10]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [24]:
random_ids_right[:10]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [25]:
random.shuffle(random_ids_left)
random.shuffle(random_ids_right)

In [26]:
# Create monotonic id
data_authors = (
    data_authors
    .withColumn('unique_id', monotonically_increasing_id())
)

In [27]:
# Create left and right indexes for each sample. You will use them for join operation
data_authors = data_authors.withColumn('left_id', udf(lambda x: random_ids_left[x], IntegerType())(col('unique_id')))
data_authors = data_authors.withColumn('right_id', udf(lambda x: random_ids_right[x], IntegerType())(col('unique_id')))

In [28]:
data_authors.cache()

DataFrame[artist: string, attack: bigint, cardClass: string, classes: array<string>, collectible: boolean, collectionText: string, cost: bigint, dbfId: bigint, durability: bigint, elite: boolean, entourage: array<string>, faction: string, flavor: string, health: bigint, hideStats: boolean, howToEarn: string, howToEarnGolden: string, id: string, mechanics: array<string>, multiClassGroup: string, name: string, overload: bigint, playRequirements: struct<REQ_DAMAGED_TARGET:bigint,REQ_ENEMY_TARGET:bigint,REQ_ENEMY_WEAPON_EQUIPPED:bigint,REQ_ENTIRE_ENTOURAGE_NOT_IN_PLAY:bigint,REQ_FRIENDLY_MINION_DIED_THIS_GAME:bigint,REQ_FRIENDLY_TARGET:bigint,REQ_FROZEN_TARGET:bigint,REQ_HERO_TARGET:bigint,REQ_LEGENDARY_TARGET:bigint,REQ_MINIMUM_ENEMY_MINIONS:bigint,REQ_MINIMUM_TOTAL_MINIONS:bigint,REQ_MINION_OR_ENEMY_HERO:bigint,REQ_MINION_SLOT_OR_MANA_CRYSTAL_SLOT:bigint,REQ_MINION_TARGET:bigint,REQ_MUST_TARGET_TAUNTER:bigint,REQ_NONSELF_TARGET:bigint,REQ_NUM_MINION_SLOTS:bigint,REQ_SECRET_CAP_FOR_NON_SE

In [29]:
data_authors.count()

829

In [30]:
data_authors.show(1, vertical=True)

-RECORD 0----------------------------------
 artist             | John Polidora        
 attack             | 10                   
 cardClass          | NEUTRAL              
 classes            | null                 
 collectible        | true                 
 collectionText     | null                 
 cost               | 9                    
 dbfId              | 2725                 
 durability         | null                 
 elite              | true                 
 entourage          | null                 
 faction            | null                 
 flavor             | This massive yeti... 
 health             | 10                   
 hideStats          | null                 
 howToEarn          | null                 
 howToEarnGolden    | null                 
 id                 | AT_125               
 mechanics          | [CHARGE]             
 multiClassGroup    | null                 
 name               | Icehowl              
 overload           | null      

In [31]:
# For each sample you need to select another sample, where left_id == right_id and create new row
# Resulting dataframe consists of [left_id, name, attack, health, right_id, right_name, right_attack, right_health]

# Small hint: data_authors.select(col('name').alias('right_name'))
# Small hint: you can join table with itself

In [32]:
data_crossed = (
    data_authors
    .select(
        col("left_id").alias("join_id"),
        col("left_id"),
        col("name"),
        col("attack"),
        col("health"),
    )
    .join(
        (
            data_authors
            .select(
                col("right_id").alias("join_id"),
                col("right_id"),
                col("name").alias("right_name"),
                col("attack").alias("right_attack"),
                col("health").alias("right_health"),
            )
        ),
        on="join_id",
        how="inner"
    )
    .drop("join_id")
)

In [33]:
data_crossed.cache()

DataFrame[left_id: int, name: string, attack: bigint, health: bigint, right_id: int, right_name: string, right_attack: bigint, right_health: bigint]

In [34]:
data_crossed.show(5)

+-------+-----------------+------+------+--------+-----------------+------------+------------+
|left_id|             name|attack|health|right_id|       right_name|right_attack|right_health|
+-------+-----------------+------+------+--------+-----------------+------------+------------+
|    570|          Icehowl|    10|    10|     570|Illidan Stormrage|           7|           5|
|    343|     Wailing Soul|     3|     5|     343|      Deadly Fork|           3|           2|
|    527|Sunfury Protector|     2|     3|     527|    Hungry Dragon|           5|           6|
|    209|     Dark Peddler|     2|     2|     209|     Ivory Knight|           4|           4|
|    123| Genzo, the Shark|     5|     4|     123|        Flame Imp|           3|           2|
+-------+-----------------+------+------+--------+-----------------+------------+------------+
only showing top 5 rows



In [35]:
# Now calculate how many attacks required for left card to kill right card
data_crossed = (
    data_crossed
    .withColumn('Attacks to kill', calculate_number_of_attacks(col('right_health'), col('attack')))
)

In [36]:
data_crossed.filter(col('Attacks to kill') > 10).show(10)

+-------+--------------------+------+------+--------+---------------+------------+------------+---------------+
|left_id|                name|attack|health|right_id|     right_name|right_attack|right_health|Attacks to kill|
+-------+--------------------+------+------+--------+---------------+------------+------------+---------------+
|     50|   Southsea Deckhand|     2|     1|      50|The Ancient One|          30|          30|             15|
|     57|             Mastiff|     1|     1|      57|     Jade Golem|          24|          24|             24|
|    745|Archmage's Appren...|     2|     4|     745|     Jade Golem|          27|          27|             14|
|    465|    Warhorse Trainer|     2|     4|     465|     Jade Golem|          26|          26|             13|
|    669|       Micro Machine|     1|     2|     669|     Jade Golem|          21|          21|             21|
+-------+--------------------+------+------+--------+---------------+------------+------------+---------

In [37]:
spark.stop()

In [None]:
"""
Additional task OPTIONAL

Calculate who will win (left or right), if left one starts first
"""