### Instalando e importando bibliotecas

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

### Definindo variáveis de path e nomes de arquivos dos datasets

In [3]:
input_path="C:\\Users\\Vandinho\\Documents\\Tecnologia\\Pos_BigData\\8_Analise_Spark\\infnet_projeto_hadoop\\input"
output_path="C:\\Users\\Vandinho\\Documents\\Tecnologia\\Pos_BigData\\8_Analise_Spark\\infnet_projeto_hadoop\\output"

In [4]:
digimon="Digimon.csv"
digivolutions="Digivolutions.csv"
skills="Skills.csv"
skills_by="Skills_by_Digimon.csv"

### Inicializando uma sessão do Spark

In [5]:
spark = SparkSession.builder.appName("Analise_Hadoop_PySpark").getOrCreate()

### Pré-processamento

### Digimon: Somente remover cabeçalho

In [59]:
df_digimon = spark.read \
    .option("delimiter", ";") \
    .csv(f"{input_path}\\{digimon}", header='true', inferSchema=True)

df_digimon = df_digimon \
    .withColumnRenamed("HP lvl 1", "HP_lvl_1") \
    .withColumnRenamed("SP lvl 1", "SP_lvl_1") \
    .withColumnRenamed("ATK lvl 1", "ATK_lvl_1") \
    .withColumnRenamed("DEF lvl 1", "DEF_lvl_1") \
    .withColumnRenamed("INT lvl 1", "INT_lvl_1") \
    .withColumnRenamed("SPD lvl 1", "SPD_lvl_1") \
    .withColumnRenamed("HP lvl 50", "HP_lvl_50") \
    .withColumnRenamed("SP lvl 50", "SP_lvl_50") \
    .withColumnRenamed("ATK lvl 50", "ATK_lvl_50") \
    .withColumnRenamed("DEF lvl 50", "DEF_lvl_50") \
    .withColumnRenamed("INT lvl 50", "INT_lvl_50") \
    .withColumnRenamed("SPD lvl 50", "SPD_lvl_50") \
    .withColumnRenamed("HP lvl 99", "HP_lvl_99") \
    .withColumnRenamed("SP lvl 99", "SP_lvl_99") \
    .withColumnRenamed("ATK lvl 99", "ATK_lvl_99") \
    .withColumnRenamed("DEF lvl 99", "DEF_lvl_99") \
    .withColumnRenamed("INT lvl 99", "INT_lvl_99") \
    .withColumnRenamed("SPD lvl 99", "SPD_lvl_99")

df_digimon.show(5)

+------+-------+-----+----+---------+------+-----------+--------+--------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+---------+---------+----------+----------+----------+----------+
|Number|Digimon|Stage|Type|Attribute|Memory|Equip Slots|HP_lvl_1|SP_lvl_1|ATK_lvl_1|DEF_lvl_1|INT_lvl_1|SPD_lvl_1|HP_lvl_50|SP_lvl_50|ATK_lvl_50|DEF_lvl_50|INT_lvl_50|SPD_lvl_50|HP_lvl_99|SP_lvl_99|ATK_lvl_99|DEF_lvl_99|INT_lvl_99|SPD_lvl_99|
+------+-------+-----+----+---------+------+-----------+--------+--------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+---------+---------+----------+----------+----------+----------+
|     1|Kuramon| Baby|Free|  Neutral|     2|          0|     150|      24|       30|       20|       19|       32|      590|       77|        79|        69|        68|        95|     1030|      131|       128|       118|       117|       159|
|     2|Pabumon| Baby|Free| 

In [18]:
print(f"Digimons - Tamanho: {df_digimon.count()}")

Digimons - Tamanho: 341


In [11]:
df_digimon.printSchema()

root
 |-- Number: integer (nullable = true)
 |-- Digimon: string (nullable = true)
 |-- Stage: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Attribute: string (nullable = true)
 |-- Memory: integer (nullable = true)
 |-- Equip Slots: integer (nullable = true)
 |-- HP lvl 1: integer (nullable = true)
 |-- SP lvl 1: integer (nullable = true)
 |-- ATK lvl 1: integer (nullable = true)
 |-- DEF lvl 1: integer (nullable = true)
 |-- INT lvl 1: integer (nullable = true)
 |-- SPD lvl 1: integer (nullable = true)
 |-- HP lvl 50: integer (nullable = true)
 |-- SP lvl 50: integer (nullable = true)
 |-- ATK lvl 50: integer (nullable = true)
 |-- DEF lvl 50: integer (nullable = true)
 |-- INT lvl 50: integer (nullable = true)
 |-- SPD lvl 50: integer (nullable = true)
 |-- HP lvl 99: integer (nullable = true)
 |-- SP lvl 99: integer (nullable = true)
 |-- ATK lvl 99: integer (nullable = true)
 |-- DEF lvl 99: integer (nullable = true)
 |-- INT lvl 99: integer (nullable = true)
 

### Digivolutions

In [40]:
df_digivolutions = spark.read \
    .option("delimiter", ";") \
    .csv(f"{input_path}\\{digivolutions}", header='true', inferSchema=True)

df_digivolutions = df_digivolutions \
    .withColumnRenamed("Digivolves from", "Digivolves_from") \
    .withColumnRenamed("Digivolves to", "Digivolves_to")

df_digivolutions.show(5)

+------+---------------+-----------------+
|Number|Digivolves_from|    Digivolves_to|
+------+---------------+-----------------+
|     1|        Kuramon|         Tsumemon|
|     1|        Kuramon|          Pagumon|
|     1|        Kuramon|Arcadiamon In-Tr.|
|     2|        Pabumon|          Tanemon|
|     2|        Pabumon|          Yokomon|
+------+---------------+-----------------+
only showing top 5 rows



In [41]:
print(f"Digivolutions - Tamanho antes da limpeza: {df_digivolutions.count()}")

Digivolutions - Tamanho antes da limpeza: 998


In [42]:
df_digivolutions = df_digivolutions.dropna(how="all")

print(f"Digivolutions - Tamanho após da limpeza: {df_digivolutions.count()}")

Digivolutions - Tamanho após da limpeza: 996


### Skills: Somente remover cabeçalho

In [69]:
df_skills = spark.read \
    .option("delimiter", ";") \
    .csv(f"{input_path}\\{skills}", header='true', inferSchema=True)

df_skills = df_skills \
    .withColumnRenamed("SP Cost", "SP_Cost")

df_skills.show(5)

+--------------------+-------+--------+-----+---------+-----------+--------------------+
|               Skill|SP_Cost|    Type|Power|Attribute|Inheritable|         Description|
+--------------------+-------+--------+-----+---------+-----------+--------------------+
|  Acceleration Boost|      6| Support|    0|  Neutral|        Yes|Doubles damage ou...|
|Adhesive Bubble Blow|      2|Physical|   25|    Plant|         No|Physical attack, ...|
|      Agility Charge|      6| Support|    0|  Neutral|        Yes|Increases EVA of ...|
|    Aguichant Levres|     40|   Magic|    0|    Light|         No|INT-penetrating s...|
|         Air Bubbles|      2|   Magic|   30|    Water|         No|Magic attack, 30 ...|
+--------------------+-------+--------+-----+---------+-----------+--------------------+
only showing top 5 rows



In [22]:
print(f"Skills - Tamanho: {df_skills.count()}")

Skills - Tamanho: 505


### Skills by Digimon

In [24]:
df_skills_by = spark.read \
    .option("delimiter", ";") \
    .csv(f"{input_path}\\{skills_by}", header='true', inferSchema=True)

df_skills_by.show(5)

+------+-------+--------------------+-----+
|Number|Digimon|               Skill|Level|
+------+-------+--------------------+-----+
|     1|Kuramon|           Glare Eye|    1|
|     2|Pabumon|Adhesive Bubble Blow|    1|
|     3|Punimon|         Bubble Blow|    1|
|     4|Botamon|         Bubble Blow|    1|
|     5|Poyomon|   Super Bubble Blow|    1|
+------+-------+--------------------+-----+
only showing top 5 rows



### Tratamento: Algumas linhas relativas ao digimon Coredramon (Blue) estão nulas na coluna 'Number', observe:

In [27]:
df_skills_by.filter(df_skills_by["Digimon"] == "Coredramon (Blue)").select("Number").show()

+------+
|Number|
+------+
|    96|
|    96|
|      |
|      |
+------+



### Vamos preenche-la com o número correto do digimon, que é 96, conforme as outras linhas

In [29]:
df_skills_by = df_skills_by \
    .withColumn("Number", when(df_skills_by["Digimon"] == "Coredramon (Blue)", 96) \
    .otherwise(df_skills_by["Number"]))

### Confirmando que o preenchimento da coluna foi feito

In [30]:
df_skills_by.filter(df_skills_by["Digimon"] == "Coredramon (Blue)").select("Number").show()

+------+
|Number|
+------+
|    96|
|    96|
|    96|
|    96|
+------+



### Executando as queries do Hive usando o PySpark

### Criando TempViews dos dataframes para acessa-los em forma de tabelas

In [70]:
df_digimon.createOrReplaceTempView("stg_digimon")
df_digivolutions.createOrReplaceTempView("stg_digivolutions")
df_skills.createOrReplaceTempView("stg_skills")
df_skills_by.createOrReplaceTempView("stg_skills_by_digimon")

### Fazendo as análises utilizando PySpark

In [32]:
# Quantos digimons existem no dataset?
r1 = spark.sql("SELECT count(*) AS qt_digimons FROM stg_digimon")

r1.show()

+-----------+
|qt_digimons|
+-----------+
|        341|
+-----------+



In [34]:
# Quantos digimons existem por estágio?
r2 = spark.sql("SELECT Stage, count(*) AS qt_digimons FROM stg_digimon GROUP BY Stage")

r2.show()

+-----------+-----------+
|      Stage|qt_digimons|
+-----------+-----------+
|   Ultimate|         78|
|       None|          2|
|       Baby|          5|
|      Ultra|         14|
|     Rookie|         50|
|   Champion|         79|
|      Armor|          3|
|       Mega|         98|
|In-Training|         12|
+-----------+-----------+



In [56]:
# Qual Digimon possui mais evoluções diferentes?
r3 = spark.sql("SELECT Digivolves_from, count(*) AS qt_digivolutions FROM stg_digivolutions GROUP BY Digivolves_from ORDER BY qt_digivolutions DESC")

r3.show(20)

+---------------+----------------+
|Digivolves_from|qt_digivolutions|
+---------------+----------------+
|         Impmon|               6|
|        Patamon|               6|
|        Hackmon|               6|
|       Solarmon|               6|
|      ToyAgumon|               6|
|        Guilmon|               6|
|       Otamamon|               6|
|        Bakemon|               6|
|   Agumon (Blk)|               6|
|       Tsunomon|               6|
|         Agumon|               6|
|        Renamon|               6|
|        Wormmon|               6|
|     Gigadramon|               6|
|      Birdramon|               6|
|        Koromon|               6|
|        Gabumon|               6|
|        Meramon|               6|
|        Elecmon|               6|
|         Gaomon|               6|
|         Palmon|               6|
|       Goblimon|               6|
|       Gotsumon|               6|
|         Lopmon|               6|
|        Betamon|               6|
|         Veemon|   

### Resposta: Podemos observar que vários digimons empatam com o número máximo de 6 evoluções.

In [61]:
# Top 5 digimons com o maior valor em HP
r4 = spark.sql("SELECT Digimon, HP_lvl_99 FROM stg_digimon GROUP BY Digimon, HP_lvl_99 ORDER BY HP_lvl_99 DESC LIMIT 5")

r4.show()

+---------------+---------+
|        Digimon|HP_lvl_99|
+---------------+---------+
|      Gankoomon|     2670|
|  ShogunGekomon|     2620|
|ShineGreymon BM|     2570|
|        Titamon|     2570|
|   ShineGreymon|     2470|
+---------------+---------+



In [63]:
# Top 5 digimons com o maior valor em ATK
r5 = spark.sql("SELECT Digimon, ATK_lvl_99 FROM stg_digimon GROUP BY Digimon, ATK_lvl_99 ORDER BY ATK_lvl_99 DESC LIMIT 5")

r5.show()

+------------+----------+
|     Digimon|ATK_lvl_99|
+------------+----------+
|    Chaosmon|       382|
|Armageddemon|       319|
|Belphemon RM|       311|
|  Diaboromon|       307|
| Chaosmon VA|       303|
+------------+----------+



In [64]:
# Top 5 digimons com o maior valor em DEF
r6 = spark.sql("SELECT Digimon, DEF_lvl_99 FROM stg_digimon GROUP BY Digimon, DEF_lvl_99 ORDER BY DEF_lvl_99 DESC LIMIT 5")

r6.show()

+---------------+----------+
|        Digimon|DEF_lvl_99|
+---------------+----------+
|PlatinumNumemon|       999|
|PlatinumSukamon|       999|
|  GroundLocomon|       277|
|       Magnamon|       272|
|      Craniamon|       272|
+---------------+----------+



In [65]:
# Top 5 digimons com o maior valor em SPD
r7 = spark.sql("SELECT Digimon, SPD_lvl_99 FROM stg_digimon GROUP BY Digimon, SPD_lvl_99 ORDER BY SPD_lvl_99 DESC LIMIT 5")

r7.show()

+-------------------+----------+
|            Digimon|SPD_lvl_99|
+-------------------+----------+
|      Leopardmon LM|       282|
|MagnaGarurumon (SV)|       282|
|         Ravemon BM|       277|
|            Ravemon|       267|
|   UlforceVeedramon|       257|
+-------------------+----------+



In [66]:
# Top 5 digimons que ocupam a maior quantidade de memória
r8 = spark.sql("SELECT Digimon, Memory FROM stg_digimon ORDER BY Memory DESC LIMIT 5")

r8.show()

+-----------------+------+
|          Digimon|Memory|
+-----------------+------+
|    Leopardmon LM|    25|
|Alphamon Ouryuken|    25|
|     Beelzemon BM|    25|
|     Armageddemon|    25|
| Arcadiamon Ultra|    25|
+-----------------+------+



In [71]:
# Qual técnica custa a maior quantidade de energia (SP)?
r9 = spark.sql("SELECT Skill FROM stg_skills ORDER BY SP_Cost DESC LIMIT 1")

r9.show()

+--------------------+
|               Skill|
+--------------------+
|Protect Wave (Awake)|
+--------------------+



In [73]:
# Quais digimons podem aprendê-la?
r10 = spark.sql("""
SELECT dg.Digimon FROM stg_digimon dg
INNER JOIN stg_skills_by_digimon skd
ON skd.Number = dg.Number
WHERE skd.Skill = 'Protect Wave (Awake)'
""")

r10.show()

+--------------------+
|             Digimon|
+--------------------+
|Sistermon B (Awake.)|
+--------------------+



In [74]:
# Top 3 técnicas mais fortes considerando o valor de poder (Power)
r11 = spark.sql("SELECT Skill, Power FROM stg_skills ORDER BY Power DESC LIMIT 3")

r11.show()

+----------------+-----+
|           Skill|Power|
+----------------+-----+
|  Atomic Blaster|  250|
|  Dragon Impulse|  250|
|Divine Atonement|  200|
+----------------+-----+



### Análises usando as funções do PySpark vistas em aula