In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=eb21fb7c11c1c45c1aaee75236e5db8dcbeb41f23b74bb23ca11627947fdcbdd
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[2]")\
    .appName("PySpark")\
    .config("spark.executor.memory", "2g")\
    .getOrCreate()

In [63]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [64]:
import random
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, udf, monotonically_increasing_id, mean

In [65]:
data = spark.read.json('/content/drive/MyDrive/datasets_itmo/cards.json')

In [66]:
data.show(10)

+----------------+------+---------+-------+-----------+--------------+----+-----+----------+-----+--------------------+-------+--------------------+------+---------+---------+---------------+---------+--------------------+---------------+-------------------+--------+--------------------+-----------+----+---------+--------------+-------+-----------+------------------+--------------------+-----------+
|          artist|attack|cardClass|classes|collectible|collectionText|cost|dbfId|durability|elite|           entourage|faction|              flavor|health|hideStats|howToEarn|howToEarnGolden|       id|           mechanics|multiClassGroup|               name|overload|    playRequirements|playerClass|race|   rarity|referencedTags|    set|spellDamage|targetingArrowText|                text|       type|
+----------------+------+---------+-------+-----------+--------------+----+-----+----------+-----+--------------------+-------+--------------------+------+---------+---------+---------------+---

In [67]:
data = data.na.drop(subset=["artist", "attack", "health"])
data_authors = data.cache()

In [68]:
for column_name in data_authors.columns:
    rows_number_not_empty = data_authors.filter(col(column_name).isNotNull()).count()
    rows_number_unique = data_authors.select(column_name).distinct().count() # distinct operation may help

    print('{}. Non empty: {}\tUnique values: {}'.format(column_name, rows_number_not_empty, rows_number_unique))

artist. Non empty: 829	Unique values: 234
attack. Non empty: 829	Unique values: 31
cardClass. Non empty: 829	Unique values: 10
classes. Non empty: 9	Unique values: 4
collectible. Non empty: 731	Unique values: 2
collectionText. Non empty: 5	Unique values: 6
cost. Non empty: 829	Unique values: 14
dbfId. Non empty: 829	Unique values: 829
durability. Non empty: 0	Unique values: 1
elite. Non empty: 141	Unique values: 2
entourage. Non empty: 6	Unique values: 7
faction. Non empty: 60	Unique values: 3
flavor. Non empty: 731	Unique values: 732
health. Non empty: 829	Unique values: 30
hideStats. Non empty: 2	Unique values: 2
howToEarn. Non empty: 136	Unique values: 27
howToEarnGolden. Non empty: 181	Unique values: 77
id. Non empty: 829	Unique values: 829
mechanics. Non empty: 553	Unique values: 49
multiClassGroup. Non empty: 9	Unique values: 4
name. Non empty: 829	Unique values: 791
overload. Non empty: 9	Unique values: 4
playRequirements. Non empty: 81	Unique values: 34
playerClass. Non empty: 

In [69]:
data_authors.select('race').distinct().show()
data_authors.select('rarity').distinct().show()

+----------+
|      race|
+----------+
|MECHANICAL|
|    MURLOC|
|    DRAGON|
|     TOTEM|
|     BEAST|
|     DEMON|
|    PIRATE|
|      NULL|
+----------+

+---------+
|   rarity|
+---------+
|     FREE|
|     EPIC|
|     RARE|
|   COMMON|
|LEGENDARY|
|     NULL|
+---------+



In [70]:
str2int_rarity = {
    'FREE': 1,
    'COMMON': 2,
    'RARE': 3,
    'LEGENDARY': 4,
    'EPIC': 5,
    'NULL': 0
}

In [71]:
def rarity2int(rarity):
    return int(str2int_rarity.get(rarity, 0))

In [72]:
data_authors = data_authors.withColumn('rarity_int', udf(lambda x:rarity2int(x), IntegerType())(col('rarity')))

In [73]:
data_authors.groupby('race').agg(mean('rarity_int').alias('average rarity')).show()

+----------+------------------+
|      race|    average rarity|
+----------+------------------+
|MECHANICAL|3.0185185185185186|
|      NULL|2.6536502546689302|
|    MURLOC| 2.652173913043478|
|    DRAGON|3.2222222222222223|
|     TOTEM|               2.5|
|     BEAST|               2.3|
|     DEMON|2.3333333333333335|
|    PIRATE|2.8947368421052633|
+----------+------------------+



In [74]:
@udf(returnType=IntegerType())
def calculate_number_of_attacks(health, attack):
    if attack == 0:
        return -1
    return health // attack + (0 if health % attack == 0 else 1)

In [75]:
data_authors_2 = data_authors.withColumn('Attack to kill itself', calculate_number_of_attacks("health", "attack")).select('health', 'attack', 'Attack to kill itself')
data_authors_2.show(5)

+------+------+---------------------+
|health|attack|Attack to kill itself|
+------+------+---------------------+
|    10|    10|                    1|
|     5|     3|                    2|
|     3|     2|                    2|
|     2|     2|                    1|
|     4|     5|                    1|
+------+------+---------------------+
only showing top 5 rows



In [76]:
random_ids_left = list(range(data_authors.count() + 1))
random_ids_right = list(range(data_authors.count() + 1))

random.shuffle(random_ids_left)
random.shuffle(random_ids_right)

In [77]:
data_authors = data_authors.withColumn('unique_id', monotonically_increasing_id())

In [78]:
data_authors = data_authors.withColumn('left_id', udf(lambda x: random_ids_left[x], IntegerType())(col('unique_id')))
data_authors = data_authors.withColumn('right_id', udf(lambda x: random_ids_right[x], IntegerType())(col('unique_id'))).cache()

In [79]:
data_authors.count()

829

In [80]:
data_authors.show(10)

+--------------+------+---------+-------+-----------+--------------+----+-----+----------+-----+---------+--------+--------------------+------+---------+--------------------+--------------------+-------+-----------+---------------+-------------------+--------+--------------------+-----------+-----+---------+--------------+-------+-----------+------------------+--------------------+------+----------+---------+-------+--------+
|        artist|attack|cardClass|classes|collectible|collectionText|cost|dbfId|durability|elite|entourage| faction|              flavor|health|hideStats|           howToEarn|     howToEarnGolden|     id|  mechanics|multiClassGroup|               name|overload|    playRequirements|playerClass| race|   rarity|referencedTags|    set|spellDamage|targetingArrowText|                text|  type|rarity_int|unique_id|left_id|right_id|
+--------------+------+---------+-------+-----------+--------------+----+-----+----------+-----+---------+--------+--------------------+----

In [89]:
A = data_authors.alias("da1").join(data_authors.alias("da2"), col("da1.left_id") == col("da2.right_id"))

data_crossed = A.select(col("da1.unique_id").alias("left_id"),
                        col("da1.name"), col("da1.attack"),
                        col("da1.health"),
                        col("da2.unique_id").alias("right_id"),
                        col("da2.name").alias("right_name"),
                        col("da2.attack").alias("right_attack"),
                        col("da1.health").alias("right_health"))

In [90]:
data_crossed.show(10)

+-------+-------------------+------+------+--------+--------------------+------------+------------+
|left_id|               name|attack|health|right_id|          right_name|right_attack|right_health|
+-------+-------------------+------+------+--------+--------------------+------------+------------+
|      0|            Icehowl|    10|    10|     327|Medivh, the Guardian|           7|          10|
|      1|       Wailing Soul|     3|     5|     212|          Jade Golem|          18|           5|
|      2|  Sunfury Protector|     2|     3|     522|      Frigid Snobold|           2|           3|
|      3|       Dark Peddler|     2|     2|     228|     Enchanted Raven|           2|           2|
|      4|   Genzo, the Shark|     5|     4|     740|    Cruel Taskmaster|           2|           4|
|      5|   Reliquary Seeker|     1|     1|     575|           Runic Egg|           0|           1|
|      6|Injured Blademaster|     4|     7|      15|     River Crocolisk|           2|           7|


In [91]:
data_crossed = data_crossed.withColumn('Attacks to kill', calculate_number_of_attacks("attack", "right_health"))

In [94]:
data_crossed.filter(col('Attacks to kill') > 10).show(100)

+-------+----+------+------+--------+----------+------------+------------+---------------+
|left_id|name|attack|health|right_id|right_name|right_attack|right_health|Attacks to kill|
+-------+----+------+------+--------+----------+------------+------------+---------------+
+-------+----+------+------+--------+----------+------------+------------+---------------+



In [102]:
data_crossed.filter(col('Attacks to kill') > 3).show(100)

+-------+--------------+------+------+--------+--------------------+------------+------------+---------------+
|left_id|          name|attack|health|right_id|          right_name|right_attack|right_health|Attacks to kill|
+-------+--------------+------+------+--------+--------------------+------------+------------+---------------+
|    395|         Knife|     5|     1|     276|           Deathlord|           2|           1|              5|
|    713|One-eyed Cheat|     4|     1|     316|  Majordomo Executus|           9|           1|              4|
|    349|   Magma Rager|     5|     1|     443|Blackwing Technician|           2|           1|              5|
|    450|  Shadow Rager|     5|     1|     490|        Reno Jackson|           4|           1|              5|
|    720|      Duskboar|     4|     1|     590|      Gorillabot A-3|           3|           1|              4|
|    824|     Mini-Mage|     4|     1|     680|          Jade Golem|          27|           1|              4|
|