In [None]:
"""
Create spark session with 2 cores and 2g memory on a driver (you will work locally)
"""

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("lab3").master("local[2]").config("spark.driver.memory", "2g").getOrCreate()


In [None]:
"""
Using Spark UI link below you can see tasks submitted to spark
Also you can see cached values
"""

In [2]:
spark

In [3]:
import random

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, udf, monotonically_increasing_id, mean

In [None]:
"""
Read dataset
"""

In [4]:
data_path = 'C://Users//Daan_//Downloads//cards.json'
data = spark.read.option("inferSchema", "true").json(data_path)
data.show(10)

+----------------+------+---------+-------+-----------+--------------+----+-----+----------+-----+--------------------+-------+--------------------+------+---------+---------+---------------+---------+--------------------+---------------+-------------------+--------+--------------------+-----------+----+---------+--------------+-------+-----------+------------------+--------------------+-----------+
|          artist|attack|cardClass|classes|collectible|collectionText|cost|dbfId|durability|elite|           entourage|faction|              flavor|health|hideStats|howToEarn|howToEarnGolden|       id|           mechanics|multiClassGroup|               name|overload|    playRequirements|playerClass|race|   rarity|referencedTags|    set|spellDamage|targetingArrowText|                text|       type|
+----------------+------+---------+-------+-----------+--------------+----+-----+----------+-----+--------------------+-------+--------------------+------+---------+---------+---------------+---

In [None]:
"""
Remove samples without artist name, attack and health value
"""

In [5]:
data_authors = data.filter("artist IS NOT NULL AND attack IS NOT NULL AND health IS NOT NULL").cache() # remember about .cache() operation

In [None]:
"""
For each column print the number of non Null rows and number of unique rows
"""

In [6]:
for column_name in data_authors.columns:
    rows_number_not_empty = data_authors.filter(col(column_name).isNotNull()).count()
    rows_number_unique = data_authors.select(column_name).distinct().count() # distinct operation may help
    
    print('{}. Non empty: {}\tUnique values: {}'.format(column_name, rows_number_not_empty, rows_number_unique))

artist. Non empty: 829	Unique values: 234
attack. Non empty: 829	Unique values: 31
cardClass. Non empty: 829	Unique values: 10
classes. Non empty: 9	Unique values: 4
collectible. Non empty: 731	Unique values: 2
collectionText. Non empty: 5	Unique values: 6
cost. Non empty: 829	Unique values: 14
dbfId. Non empty: 829	Unique values: 829
durability. Non empty: 0	Unique values: 1
elite. Non empty: 141	Unique values: 2
entourage. Non empty: 6	Unique values: 7
faction. Non empty: 60	Unique values: 3
flavor. Non empty: 731	Unique values: 732
health. Non empty: 829	Unique values: 30
hideStats. Non empty: 2	Unique values: 2
howToEarn. Non empty: 136	Unique values: 27
howToEarnGolden. Non empty: 181	Unique values: 77
id. Non empty: 829	Unique values: 829
mechanics. Non empty: 553	Unique values: 49
multiClassGroup. Non empty: 9	Unique values: 4
name. Non empty: 829	Unique values: 791
overload. Non empty: 9	Unique values: 4
playRequirements. Non empty: 81	Unique values: 34
playerClass. Non empty: 

In [7]:
data_authors.select('race').distinct().show()

+----------+
|      race|
+----------+
|MECHANICAL|
|    MURLOC|
|    DRAGON|
|     TOTEM|
|     BEAST|
|     DEMON|
|    PIRATE|
|      NULL|
+----------+



In [8]:
data_authors.select('rarity').distinct().show()

+---------+
|   rarity|
+---------+
|     FREE|
|     EPIC|
|     RARE|
|   COMMON|
|LEGENDARY|
|     NULL|
+---------+



In [9]:
str2int_rarity = {
    'FREE': 1,
    'COMMON': 2,
    'RARE': 3,
    'LEGENDARY': 4,
    'EPIC': 5,
    'NULL': 0
}

In [None]:
"""
Using dict above - create a new column with name "rarity_int", where each string will be
converted to an integer

Operation "col" return "pointer" to required column
"""

In [10]:
rarity_to_int_udf = udf(lambda x: str2int_rarity.get(x, 0), IntegerType())
data_authors = data_authors.withColumn('rarity_int', rarity_to_int_udf(col('rarity')))

In [None]:
"""
Calculate average rarity of each race
"""

In [11]:
data_authors.groupby('race').agg(mean('rarity_int').alias('average rarity')).show()

+----------+------------------+
|      race|    average rarity|
+----------+------------------+
|MECHANICAL|3.0185185185185186|
|      NULL|2.6536502546689302|
|    MURLOC| 2.652173913043478|
|    DRAGON|3.2222222222222223|
|     TOTEM|               2.5|
|     BEAST|               2.3|
|     DEMON|2.3333333333333335|
|    PIRATE|2.8947368421052633|
+----------+------------------+



In [12]:
@udf(returnType=IntegerType())
def calculate_number_of_attacks(health, attack):
    if attack == 0:
        return -1
    return health // attack + (0 if health % attack == 0 else 1)

In [None]:
"""
Using udf defined above, calculate how many attacks needed for each card to kill itself
"""

In [13]:
data_authors_2 = data_authors.withColumn('Attack to kill itself', calculate_number_of_attacks(col('health'), col('attack')))

data_authors_2 = data_authors_2.select('health', 'attack', 'Attack to kill itself')

In [14]:
data_authors_2.show(5)

+------+------+---------------------+
|health|attack|Attack to kill itself|
+------+------+---------------------+
|    10|    10|                    1|
|     5|     3|                    2|
|     3|     2|                    2|
|     2|     2|                    1|
|     4|     5|                    1|
+------+------+---------------------+
only showing top 5 rows



In [None]:
"""
Here some work on randomizing indexes for dataframe
It is not always easy to assign unique values for each row non-monotonically
"""

In [15]:
random_ids_left = list(range(data_authors.count() + 1))
random_ids_right = list(range(data_authors.count() + 1))

In [16]:
random.shuffle(random_ids_left)
random.shuffle(random_ids_right)

In [None]:
"""
Create monotonic id
"""

In [17]:
data_authors = data_authors.withColumn('unique_id', monotonically_increasing_id())

In [None]:
"""
Create left and right indexes for each sample. You will use them for join operation
"""

In [18]:
data_authors = data_authors.withColumn('left_id', udf(lambda x: random_ids_left[x], IntegerType())(col('unique_id')))
data_authors = data_authors.withColumn('right_id', udf(lambda x: random_ids_right[x], IntegerType())(col('unique_id'))).cache()

In [19]:
data_authors.count()

829

In [None]:
"""
For each sample you need to select another sample, where left_id == right_id and create new row
Resulting dataframe consists of [left_id, name, attack, health, right_id, right_name, right_attack, right_health]
"""

In [None]:
"""
Small hint: data_authors.select(col('name').alias('right_name'))
"""

In [None]:
"""
Small hint: you can join table with itself
"""

In [20]:
data_crossed = data_authors.alias('left').join(data_authors.alias('right'), col('left.left_id') == col('right.right_id'))

data_crossed = data_crossed.select(
    col('left.left_id').alias('left_id'),
    col('left.name'),
    col('left.attack'),
    col('left.health'),
    col('right.right_id').alias('right_id'),
    col('right.name').alias('right_name'),
    col('right.attack').alias('right_attack'),
    col('right.health').alias('right_health')
)

data_crossed.show(1)

+-------+-------+------+------+--------+--------------------+------------+------------+
|left_id|   name|attack|health|right_id|          right_name|right_attack|right_health|
+-------+-------+------+------+--------+--------------------+------------+------------+
|      0|Icehowl|    10|    10|       0|Mistress of Mixtures|           2|           2|
+-------+-------+------+------+--------+--------------------+------------+------------+
only showing top 1 row



In [None]:
"""
Now calculate how many attacks required for left card to kill right card
"""

In [21]:
data_crossed = data_crossed.withColumn('Attacks to kill', calculate_number_of_attacks(col('right_health'), col('attack')))

In [22]:
data_crossed.filter(col('Attacks to kill') > 10).show(100)

+-------+------------------+------+------+--------+----------+------------+------------+---------------+
|left_id|              name|attack|health|right_id|right_name|right_attack|right_health|Attacks to kill|
+-------+------------------+------+------+--------+----------+------------+------------+---------------+
|    440|  Grimscale Oracle|     1|     1|     440|Jade Golem|          16|          16|             16|
|    732|Meanstreet Marshal|     1|     2|     732|Jade Golem|          25|          25|             25|
|    393| Warsong Commander|     2|     3|     393|Jade Golem|          22|          22|             11|
|     88|        Voidwalker|     1|     3|      88|Jade Golem|          24|          24|             24|
|    708|  Bolvar Fordragon|     1|     7|     708|Jade Golem|          20|          20|             20|
+-------+------------------+------+------+--------+----------+------------+------------+---------------+

