# Processing Pokémon Data With Apache Spark

by Max Woolf (@minimaxir)

*This notebook is licensed under the MIT License. If you use the code or data visualization designs contained within this notebook, it would be greatly appreciated if proper attribution is given back to this notebook and/or myself. Thanks! :)*

In [1]:
# Easiest way to get Spark to work with Jupyter: https://github.com/minrk/findspark
import findspark
findspark.init('/Users/maxwoolf/Desktop/spark-2.0.0-bin-hadoop2.7')

import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext(appName="pokemon_pca")
spark = SparkSession(sc)

Web URL: local[*]:4040


In [25]:
config = sc._conf.getAll()
print 'Web URL: ' + filter(lambda x: 'spark.driver.host' in x[0], config)[0][1] + ':4040'

Web URL: 10.0.1.11:4040


Load and cache all necessary DataFrames.

In [2]:
csv_params = {'header': True, 'inferSchema': True}

df_pokemon = spark.read.csv('pokemon_data/pokemon.csv', header=True, inferSchema=True).cache()
df_stats = spark.read.csv('pokemon_data/pokemon_stats.csv', header=True, inferSchema=True).cache()
df_types = spark.read.csv('pokemon_data/pokemon_types.csv', header=True, inferSchema=True).cache()
df_moves = spark.read.csv('pokemon_data/pokemon_moves.csv', header=True, inferSchema=True).cache()
df_abilities = spark.read.csv('pokemon_data/pokemon_abilities.csv', header=True, inferSchema=True).cache()
df_species = spark.read.csv('pokemon_data/pokemon_species.csv', header=True, inferSchema=True).cache()

### Height/Weight

Normalize Pokemon Height/Weight to [0,1] by dividing by Max Height/Weight.

In [3]:
df_pokemon.show(6)
df_pokemon.printSchema()

+---+----------+----------+------+------+---------------+-----+----------+
| id|identifier|species_id|height|weight|base_experience|order|is_default|
+---+----------+----------+------+------+---------------+-----+----------+
|  1| bulbasaur|         1|     7|    69|             64|    1|         1|
|  2|   ivysaur|         2|    10|   130|            142|    2|         1|
|  3|  venusaur|         3|    20|  1000|            236|    3|         1|
|  4|charmander|         4|     6|    85|             62|    5|         1|
|  5|charmeleon|         5|    11|   190|            142|    6|         1|
|  6| charizard|         6|    17|   905|            240|    7|         1|
+---+----------+----------+------+------+---------------+-----+----------+
only showing top 6 rows

root
 |-- id: integer (nullable = true)
 |-- identifier: string (nullable = true)
 |-- species_id: integer (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- base_experience: 

In [27]:
max_height = df_pokemon.agg({"height": "max"}).first()[0]
max_weight = df_pokemon.agg({"weight": "max"}).first()[0]

df_pokemon_t = (df_pokemon
                .filter(df_pokemon.id < 10000)
              .withColumn('height_t', df_pokemon.height / max_height)
              .withColumn('weight_t', df_pokemon.weight / max_weight)
                .select(['id','height_t', 'weight_t'])
              )

df_pokemon_t.show(6)

+---+--------------------+--------------------+
| id|            height_t|            weight_t|
+---+--------------------+--------------------+
|  1| 0.04827586206896552|0.006902070621186...|
|  2| 0.06896551724137931|0.013003901170351105|
|  3| 0.13793103448275862| 0.10003000900270081|
|  4|0.041379310344827586|0.008502550765229568|
|  5| 0.07586206896551724|0.019005701710513155|
|  6| 0.11724137931034483| 0.09052715814744423|
+---+--------------------+--------------------+
only showing top 6 rows



### Base Stats (Att/Def/etc.)

Split each stat into its own column, then run the same scaler

In [5]:
df_stats.show(12)
df_stats.printSchema()

+----------+-------+---------+------+
|pokemon_id|stat_id|base_stat|effort|
+----------+-------+---------+------+
|         1|      1|       45|     0|
|         1|      2|       49|     0|
|         1|      3|       49|     0|
|         1|      4|       65|     1|
|         1|      5|       65|     0|
|         1|      6|       45|     0|
|         2|      1|       60|     0|
|         2|      2|       62|     0|
|         2|      3|       63|     0|
|         2|      4|       80|     1|
|         2|      5|       80|     1|
|         2|      6|       60|     0|
+----------+-------+---------+------+
only showing top 12 rows

root
 |-- pokemon_id: integer (nullable = true)
 |-- stat_id: integer (nullable = true)
 |-- base_stat: integer (nullable = true)
 |-- effort: integer (nullable = true)



Aggregate each `base_stat` into an `array` of length 6 for each Pokemon. Requires stepping down to RDDs.

Extract each row as a tuple, group by key to get array group.

In [6]:
df_stats.orderBy('pokemon_id', 'stat_id').show()

+----------+-------+---------+------+
|pokemon_id|stat_id|base_stat|effort|
+----------+-------+---------+------+
|         1|      1|       45|     0|
|         1|      2|       49|     0|
|         1|      3|       49|     0|
|         1|      4|       65|     1|
|         1|      5|       65|     0|
|         1|      6|       45|     0|
|         2|      1|       60|     0|
|         2|      2|       62|     0|
|         2|      3|       63|     0|
|         2|      4|       80|     1|
|         2|      5|       80|     1|
|         2|      6|       60|     0|
|         3|      1|       80|     0|
|         3|      2|       82|     0|
|         3|      3|       83|     0|
|         3|      4|      100|     2|
|         3|      5|      100|     1|
|         3|      6|       80|     0|
|         4|      1|       39|     0|
|         4|      2|       52|     0|
+----------+-------+---------+------+
only showing top 20 rows



In [7]:
df_stats_t = (df_stats
                .orderBy('pokemon_id', 'stat_id')   # Ensure correct order
                .rdd
                .map(lambda row: (row[0], row[2]))
                .groupByKey()
                .map(lambda row: (row[0], list(row[1]))) 
                .map(lambda row: (row[0], row[1][0], row[1][1], row[1][2], row[1][3], row[1][4], row[1][5]))
                .toDF()
                .orderBy('_1')
                .cache()
                )

df_stats_t.show(12)

+---+---+---+---+---+---+---+
| _1| _2| _3| _4| _5| _6| _7|
+---+---+---+---+---+---+---+
|  1| 45| 49| 49| 65| 65| 45|
|  2| 60| 62| 63| 80| 80| 60|
|  3| 80| 82| 83|100|100| 80|
|  4| 39| 52| 43| 60| 50| 65|
|  5| 58| 64| 58| 80| 65| 80|
|  6| 78| 84| 78|109| 85|100|
|  7| 44| 48| 65| 50| 64| 43|
|  8| 59| 63| 80| 65| 80| 58|
|  9| 79| 83|100| 85|105| 78|
| 10| 45| 30| 35| 20| 20| 45|
| 11| 50| 20| 55| 25| 25| 30|
| 12| 60| 45| 50| 90| 80| 70|
+---+---+---+---+---+---+---+
only showing top 12 rows



Rename the columns. Unfortunately, bulk renames annoying in Spark.

In [8]:
# http://stackoverflow.com/a/34077809
# via stats.csv
col_names = df_stats_t.schema.names
stat_names = ['id', 'hp', 'attack', 'defense', 'special_attack', 'special_defense', 'speed']


df_stats_t = reduce(lambda data, idx: data.withColumnRenamed(col_names[idx], stat_names[idx]), xrange(len(col_names)), df_stats_t)
df_stats_t.show(12)

+---+---+------+-------+--------------+---------------+-----+
| id| hp|attack|defense|special_attack|special_defense|speed|
+---+---+------+-------+--------------+---------------+-----+
|  1| 45|    49|     49|            65|             65|   45|
|  2| 60|    62|     63|            80|             80|   60|
|  3| 80|    82|     83|           100|            100|   80|
|  4| 39|    52|     43|            60|             50|   65|
|  5| 58|    64|     58|            80|             65|   80|
|  6| 78|    84|     78|           109|             85|  100|
|  7| 44|    48|     65|            50|             64|   43|
|  8| 59|    63|     80|            65|             80|   58|
|  9| 79|    83|    100|            85|            105|   78|
| 10| 45|    30|     35|            20|             20|   45|
| 11| 50|    20|     55|            25|             25|   30|
| 12| 60|    45|     50|            90|             80|   70|
+---+---+------+-------+--------------+---------------+-----+
only sho

Scale each column to [0,1].

In [9]:
max_hp = df_stats_t.selectExpr('max(hp)').first()[0]
max_attack = df_stats_t.selectExpr('max(attack)').first()[0]
max_defense = df_stats_t.selectExpr('max(defense)').first()[0]
max_special_attack = df_stats_t.selectExpr('max(special_attack)').first()[0]
max_special_defense = df_stats_t.selectExpr('max(special_defense)').first()[0]
max_speed = df_stats_t.selectExpr('max(speed)').first()[0]

df_stats_t = (df_stats_t
              .withColumn('hp_t', df_stats_t.hp / max_hp)
              .withColumn('attack_t', df_stats_t.attack / max_attack)
              .withColumn('defense_t', df_stats_t.defense / max_defense)
              .withColumn('special_attack_t', df_stats_t.special_attack / max_special_attack)
              .withColumn('special_defense_t', df_stats_t.special_defense / max_special_defense)
              .withColumn('speed_t', df_stats_t.speed / max_speed)
              .select(['id', 'attack_t', 'defense_t', 'special_attack_t', 'special_defense_t', 'speed_t'])
              )

df_stats_t.show()

+---+-------------------+-------------------+-------------------+-------------------+-------------------+
| id|           attack_t|          defense_t|   special_attack_t|  special_defense_t|            speed_t|
+---+-------------------+-------------------+-------------------+-------------------+-------------------+
|  1| 0.2578947368421053|0.21304347826086956|0.33505154639175255| 0.2826086956521739|               0.25|
|  2| 0.3263157894736842|0.27391304347826084|0.41237113402061853|0.34782608695652173| 0.3333333333333333|
|  3|0.43157894736842106|0.36086956521739133| 0.5154639175257731|0.43478260869565216| 0.4444444444444444|
|  4| 0.2736842105263158|0.18695652173913044|0.30927835051546393|0.21739130434782608| 0.3611111111111111|
|  5| 0.3368421052631579|0.25217391304347825|0.41237113402061853| 0.2826086956521739| 0.4444444444444444|
|  6| 0.4421052631578947| 0.3391304347826087| 0.5618556701030928| 0.3695652173913043| 0.5555555555555556|
|  7|0.25263157894736843| 0.2826086956521739|0

## Binarize

Calculate binary features for the rest of the datasets. Violates DRY heavily, but cannot be helped due to quirks.

### Types

In [10]:
df_types.show(12)
df_types.printSchema()

+----------+-------+----+
|pokemon_id|type_id|slot|
+----------+-------+----+
|         1|     12|   1|
|         1|      4|   2|
|         2|     12|   1|
|         2|      4|   2|
|         3|     12|   1|
|         3|      4|   2|
|         4|     10|   1|
|         5|     10|   1|
|         6|     10|   1|
|         6|      3|   2|
|         7|     11|   1|
|         8|     11|   1|
+----------+-------+----+
only showing top 12 rows

root
 |-- pokemon_id: integer (nullable = true)
 |-- type_id: integer (nullable = true)
 |-- slot: integer (nullable = true)



IDs must be zero-indexed for `OneHotEncoder`. Since IDs are just one-indexed, we can just subtract 1 from the IDs.

In [11]:
df_types_t = df_types.select('pokemon_id', (df_types.type_id-1).alias('type_t'))

df_types_t.show(12)

+----------+------+
|pokemon_id|type_t|
+----------+------+
|         1|    11|
|         1|     3|
|         2|    11|
|         2|     3|
|         3|    11|
|         3|     3|
|         4|     9|
|         5|     9|
|         6|     9|
|         6|     2|
|         7|    10|
|         8|    10|
+----------+------+
only showing top 12 rows



Now run OHE.

In [12]:
#from pyspark.ml.feature import OneHotEncoder
#
#ohe = OneHotEncoder(inputCol="type_t", outputCol="type_v")
#df_types_t = ohe.transform(df_types_t)
#
#df_types_t.show()

Run a pseudo-`OneHotEncoder` by specifying the type indices directly to the `SparseVector`. This approach does not drop the last category, which is acceptable since we are not generating a predictive model.

If a Pokemon has 2 types, there are two values/indices in each `SparseVector`.

In [13]:
# http://stackoverflow.com/questions/32981875/how-to-add-two-sparse-vectors-in-spark-using-python
from pyspark.ml.linalg import Vectors, SparseVector

# Given an array of indices, create a SparseVector

def array_to_sparse(size, v):
    values = {i: 1.0 for i in v}
    return Vectors.sparse(size, values)

sparse_size = df_types_t.selectExpr('count(distinct(type_t))').first()[0]

df_types_t = (df_types_t
            .rdd
            .map(lambda row: (row[0], row[1]))
            .groupByKey()
            .map(lambda row: (row[0], array_to_sparse(sparse_size, row[1])))
            .toDF()
            .orderBy('_1')
            .withColumnRenamed('_1', 'id')
            .withColumnRenamed('_2', 'type_t')
        )

df_types_t.show()

+---+--------------------+
| id|              type_t|
+---+--------------------+
|  1|(18,[3,11],[1.0,1...|
|  2|(18,[3,11],[1.0,1...|
|  3|(18,[3,11],[1.0,1...|
|  4|      (18,[9],[1.0])|
|  5|      (18,[9],[1.0])|
|  6|(18,[2,9],[1.0,1.0])|
|  7|     (18,[10],[1.0])|
|  8|     (18,[10],[1.0])|
|  9|     (18,[10],[1.0])|
| 10|      (18,[6],[1.0])|
| 11|      (18,[6],[1.0])|
| 12|(18,[2,6],[1.0,1.0])|
| 13|(18,[3,6],[1.0,1.0])|
| 14|(18,[3,6],[1.0,1.0])|
| 15|(18,[3,6],[1.0,1.0])|
| 16|(18,[0,2],[1.0,1.0])|
| 17|(18,[0,2],[1.0,1.0])|
| 18|(18,[0,2],[1.0,1.0])|
| 19|      (18,[0],[1.0])|
| 20|      (18,[0],[1.0])|
+---+--------------------+
only showing top 20 rows



### Moves

In [14]:
df_moves.show(20)
df_moves.printSchema()

+----------+----------------+-------+----------------------+-----+-----+
|pokemon_id|version_group_id|move_id|pokemon_move_method_id|level|order|
+----------+----------------+-------+----------------------+-----+-----+
|         1|               1|     14|                     4|    0| null|
|         1|               1|     15|                     4|    0| null|
|         1|               1|     22|                     1|   13| null|
|         1|               1|     33|                     1|    1|    1|
|         1|               1|     34|                     4|    0| null|
|         1|               1|     36|                     4|    0| null|
|         1|               1|     38|                     4|    0| null|
|         1|               1|     45|                     1|    1|    2|
|         1|               1|     72|                     4|    0| null|
|         1|               1|     73|                     1|    7| null|
|         1|               1|     74|              

In [15]:
df_moves_t = (df_moves
              .filter(df_moves.move_id < 10000)   # Remove Shadow Moves exclusive to Coliseum/XD
              .select('pokemon_id', (df_moves.move_id-1).alias('move_t'))
              )
              
sparse_size = df_moves_t.selectExpr('count(distinct(move_t))').first()[0]

df_moves_t = (df_moves_t
            .rdd
            .map(lambda row: (row[0], row[1]))
            .groupByKey()
            .map(lambda row: (row[0], array_to_sparse(sparse_size, row[1])))
            .toDF()
            .orderBy('_1')
            .withColumnRenamed('_1', 'id')
            .withColumnRenamed('_2', 'move_t')
        )

df_moves_t.show()

+---+--------------------+
| id|              move_t|
+---+--------------------+
|  1|(613,[12,13,14,19...|
|  2|(613,[13,14,19,21...|
|  3|(613,[13,14,19,21...|
|  4|(613,[4,6,8,9,13,...|
|  5|(613,[4,6,8,9,13,...|
|  6|(613,[4,6,8,9,13,...|
|  7|(613,[4,7,24,28,3...|
|  8|(613,[4,7,24,28,3...|
|  9|(613,[4,7,24,28,3...|
| 10|(613,[32,80,172,4...|
| 11|(613,[80,105,333,...|
| 12|(613,[12,15,17,35...|
| 13|(613,[39,80,449,5...|
| 14|(613,[80,105,333,...|
| 15|(613,[13,14,30,35...|
| 16|(613,[12,15,16,17...|
| 17|(613,[12,15,16,17...|
| 18|(613,[12,15,16,17...|
| 19|(613,[14,28,32,33...|
| 20|(613,[13,14,28,32...|
+---+--------------------+
only showing top 20 rows



### Abilities

In [16]:
df_abilities.show(20)
df_abilities.printSchema()

+----------+----------+---------+----+
|pokemon_id|ability_id|is_hidden|slot|
+----------+----------+---------+----+
|         1|        65|        0|   1|
|         1|        34|        1|   3|
|         2|        65|        0|   1|
|         2|        34|        1|   3|
|         3|        65|        0|   1|
|         3|        34|        1|   3|
|         4|        66|        0|   1|
|         4|        94|        1|   3|
|         5|        66|        0|   1|
|         5|        94|        1|   3|
|         6|        66|        0|   1|
|         6|        94|        1|   3|
|         7|        67|        0|   1|
|         7|        44|        1|   3|
|         8|        67|        0|   1|
|         8|        44|        1|   3|
|         9|        67|        0|   1|
|         9|        44|        1|   3|
|        10|        19|        0|   1|
|        10|        50|        1|   3|
+----------+----------+---------+----+
only showing top 20 rows

root
 |-- pokemon_id: integer (nullabl

In [17]:
df_abilities_t = (df_abilities
              .filter(df_abilities.ability_id < 10000)   # Invalid
              .select('pokemon_id', (df_abilities.ability_id-1).alias('ability_t'))
              )
              
sparse_size = df_abilities_t.selectExpr('count(distinct(ability_t))').first()[0]

df_abilities_t = (df_abilities_t
            .rdd
            .map(lambda row: (row[0], row[1]))
            .groupByKey()
            .map(lambda row: (row[0], array_to_sparse(sparse_size, row[1])))
            .toDF()
            .orderBy('_1')
            .withColumnRenamed('_1', 'id')
            .withColumnRenamed('_2', 'ability_t')
        )

df_abilities_t.show()

+---+--------------------+
| id|           ability_t|
+---+--------------------+
|  1|(191,[33,64],[1.0...|
|  2|(191,[33,64],[1.0...|
|  3|(191,[33,64],[1.0...|
|  4|(191,[65,93],[1.0...|
|  5|(191,[65,93],[1.0...|
|  6|(191,[65,93],[1.0...|
|  7|(191,[43,66],[1.0...|
|  8|(191,[43,66],[1.0...|
|  9|(191,[43,66],[1.0...|
| 10|(191,[18,49],[1.0...|
| 11|    (191,[60],[1.0])|
| 12|(191,[13,109],[1....|
| 13|(191,[18,49],[1.0...|
| 14|    (191,[60],[1.0])|
| 15|(191,[67,96],[1.0...|
| 16|(191,[50,76,144],...|
| 17|(191,[50,76,144],...|
| 18|(191,[50,76,144],...|
| 19|(191,[49,54,61],[...|
| 20|(191,[49,54,61],[...|
+---+--------------------+
only showing top 20 rows



### Species

We can extract several `color`, `shape`, and `habitat` here.

In [18]:
df_species.show(6)
df_species.printSchema()

+---+----------+-------------+-----------------------+------------------+--------+--------+----------+-----------+------------+--------------+-------+-------------+----------------------+--------------+----------------+-----+--------------+
| id|identifier|generation_id|evolves_from_species_id|evolution_chain_id|color_id|shape_id|habitat_id|gender_rate|capture_rate|base_happiness|is_baby|hatch_counter|has_gender_differences|growth_rate_id|forms_switchable|order|conquest_order|
+---+----------+-------------+-----------------------+------------------+--------+--------+----------+-----------+------------+--------------+-------+-------------+----------------------+--------------+----------------+-----+--------------+
|  1| bulbasaur|            1|                   null|                 1|       5|       8|         3|          1|          45|            70|      0|           20|                     0|             4|               0|    1|          null|
|  2|   ivysaur|            1|      

In [19]:
df_species_t = (df_species 
              .select('id',
                      (df_species.color_id-1).alias('color_t'),
                      (df_species.shape_id-1).alias('shape_t'),
                      (df_species.habitat_id-1).alias('habitat_t'))
                .fillna({'habitat_t': 9}) # Empty for some pokemon
              )
              
sparse_size_color = df_species_t.selectExpr('count(distinct(color_t))').first()[0]
sparse_size_shape = df_species_t.selectExpr('count(distinct(shape_t))').first()[0]
sparse_size_habitat = df_species_t.selectExpr('count(distinct(habitat_t))').first()[0]

df_species_t = (df_species_t
            .rdd
            .map(lambda row: (row[0],
                              #row[1],
                              Vectors.sparse(sparse_size_color, {row[1]: 1.0}),
                              Vectors.sparse(sparse_size_shape, {row[2]: 1.0}),
                              Vectors.sparse(sparse_size_habitat, {row[3]: 1.0}),
                             ))
            .toDF()
            .orderBy('_1')
            .withColumnRenamed('_1', 'id')
            .withColumnRenamed('_2', 'color_t')
            .withColumnRenamed('_3', 'shape_t')
            .withColumnRenamed('_4', 'habitat_t')
        )

df_species_t.show()

+---+--------------+---------------+--------------+
| id|       color_t|        shape_t|     habitat_t|
+---+--------------+---------------+--------------+
|  1|(10,[4],[1.0])| (14,[7],[1.0])|(10,[2],[1.0])|
|  2|(10,[4],[1.0])| (14,[7],[1.0])|(10,[2],[1.0])|
|  3|(10,[4],[1.0])| (14,[7],[1.0])|(10,[2],[1.0])|
|  4|(10,[7],[1.0])| (14,[5],[1.0])|(10,[3],[1.0])|
|  5|(10,[7],[1.0])| (14,[5],[1.0])|(10,[3],[1.0])|
|  6|(10,[7],[1.0])| (14,[5],[1.0])|(10,[3],[1.0])|
|  7|(10,[1],[1.0])| (14,[5],[1.0])|(10,[8],[1.0])|
|  8|(10,[1],[1.0])| (14,[5],[1.0])|(10,[8],[1.0])|
|  9|(10,[1],[1.0])| (14,[5],[1.0])|(10,[8],[1.0])|
| 10|(10,[4],[1.0])| (14,[1],[1.0])|(10,[1],[1.0])|
| 11|(10,[4],[1.0])| (14,[1],[1.0])|(10,[1],[1.0])|
| 12|(10,[8],[1.0])|(14,[12],[1.0])|(10,[1],[1.0])|
| 13|(10,[2],[1.0])| (14,[1],[1.0])|(10,[1],[1.0])|
| 14|(10,[9],[1.0])| (14,[1],[1.0])|(10,[1],[1.0])|
| 15|(10,[9],[1.0])|(14,[12],[1.0])|(10,[1],[1.0])|
| 16|(10,[2],[1.0])| (14,[8],[1.0])|(10,[1],[1.0])|
| 17|(10,[2]

## Finale

Join and combine EVERYTHING!

In [34]:
df_combined = (df_pokemon_t
               .join(df_stats_t, 'id', 'left')
               .join(df_types_t, 'id', 'left')
               .join(df_moves_t, 'id', 'left')
               .join(df_abilities_t, 'id', 'left')
               .join(df_species_t, 'id', 'left')
               .cache()
               )

df_combined.printSchema()

root
 |-- id: integer (nullable = true)
 |-- height_t: double (nullable = true)
 |-- weight_t: double (nullable = true)
 |-- attack_t: double (nullable = true)
 |-- defense_t: double (nullable = true)
 |-- special_attack_t: double (nullable = true)
 |-- special_defense_t: double (nullable = true)
 |-- speed_t: double (nullable = true)
 |-- type_t: vector (nullable = true)
 |-- move_t: vector (nullable = true)
 |-- ability_t: vector (nullable = true)
 |-- color_t: vector (nullable = true)
 |-- shape_t: vector (nullable = true)
 |-- habitat_t: vector (nullable = true)



Before combining vectors, all vectors must be indexed with `VectorIndexer`

In [36]:
# http://stackoverflow.com/a/32984795
from pyspark.ml.feature import VectorIndexer, VectorAssembler

#col_names = df_combined.schema.names[1:]

for col_name in df_combined.schema.names[8:]:
    vector_indexer = VectorIndexer(inputCol=col_name, outputCol=col_name+'v')

    df_combined = (vector_indexer
                    .fit(df_combined)
                    .transform(df_combined)
                       .drop(col_name))

df_combined.printSchema()

root
 |-- id: integer (nullable = true)
 |-- height_t: double (nullable = true)
 |-- weight_t: double (nullable = true)
 |-- attack_t: double (nullable = true)
 |-- defense_t: double (nullable = true)
 |-- special_attack_t: double (nullable = true)
 |-- special_defense_t: double (nullable = true)
 |-- speed_t: double (nullable = true)
 |-- type_tv: vector (nullable = true)
 |-- move_tv: vector (nullable = true)
 |-- ability_tv: vector (nullable = true)
 |-- color_tv: vector (nullable = true)
 |-- shape_tv: vector (nullable = true)
 |-- habitat_tv: vector (nullable = true)



Combine all the values into a single `features` column.

In [44]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df_combined.schema.names[1:], outputCol="features")
df_features = (assembler
               .transform(df_combined)
               .select(['id', 'features'])
               .orderBy('id')
               .cache()
               )

df_features.show(5)

+---+--------------------+
| id|            features|
+---+--------------------+
|  1|(863,[0,1,2,3,4,5...|
|  2|(863,[0,1,2,3,4,5...|
|  3|(863,[0,1,2,3,4,5...|
|  4|(863,[0,1,2,3,4,5...|
|  5|(863,[0,1,2,3,4,5...|
+---+--------------------+
only showing top 5 rows



In [45]:
from pyspark.ml.feature import PCA

pca = PCA(k=50, inputCol="features", outputCol="pca_features")
model = pca.fit(df_features)
df_features = model.transform(df_features)

df_features.show(5)
print sum(model.explainedVariance[0:2])   # % of model explained by first 3 components

+---+--------------------+--------------------+
| id|            features|        pca_features|
+---+--------------------+--------------------+
|  1|(863,[0,1,2,3,4,5...|[-1.2328845199078...|
|  2|(863,[0,1,2,3,4,5...|[-1.2587694818593...|
|  3|(863,[0,1,2,3,4,5...|[-1.8688853599180...|
|  4|(863,[0,1,2,3,4,5...|[-5.5015619452006...|
|  5|(863,[0,1,2,3,4,5...|[-5.3619352981827...|
+---+--------------------+--------------------+
only showing top 5 rows

0.127711510102


Attempted to expand to polynomial space to see if we get better model. However, `863^2/2 ~ 372384 ` and PCA feature limit is apparently `65535`. Code left for posterity.

In [None]:
#from pyspark.ml.feature import PolynomialExpansion

#poly = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyfeatures")
#df_features = poly.transform(df_features)
#
#pca = PCA(k=50, inputCol="polyfeatures", outputCol="pca_polyfeatures")
#model = pca.fit(df_features)
#df_features = model.transform(df_features)
#
#df_features.show(5)
#print sum(model.explainedVariance[0:2])   # % of model explained by first 3 components

Export as CSV. Spark csv export is apparently ugly due to the `DenseVectors`, so use `pandas` CSV export instead.

In [64]:
#df_features.select('id',df_features.pca_features).repartition(1).write.format('csv').save("pokemon_pca", header=True, mode="overwrite")
#df_features.select('id','pca_features').coalesce(1).write.format('json').save("pokemon_pca", header=True, mode="overwrite")

import pandas as pd
df_features.select('id','pca_features').toPandas().to_csv('pokemon_pca.csv', index=False)

# The MIT License (MIT)

Copyright (c) 2016 Max Woolf

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.