# <sub> Pokemon Notebook - Amazon Athena </sub>
### analyzing first generation pokemon and berries from the pokemon world

#### Note: I did this notebook inside amazon athena notebook editor, as such apache spark environment was already preconfigured

In [None]:
#read pokemon data into spark df

pokemon_df = spark.read.csv('s3://nasrsolobucket/raw_pokemon/pokemon_data.csv',header=True,inferSchema=True)

#read pokemon types data into spark df

pokemon_types_df = spark.read.csv('s3://nasrsolobucket/pokemon_types_gen1/pokemon_generation_1.csv',header=True,inferSchema=True)

In [None]:
#drop unnessecary columns
pokemon_df = pokemon_df.drop('_c0')

pokemon_types_df = pokemon_types_df.drop('Name')


# combine dfs
combined_df = pokemon_df.join(pokemon_types_df,on="id",how="inner")

#display combined df
combined_df.show()

In [None]:
# print combined pokemon schema
combined_df.printSchema()

In [None]:
#convert weight column to kilograms metric

combined_df = combined_df.withColumn('weight',combined_df.weight/10)

#convert height column to metres metric

combined_df = combined_df.withColumn('height',combined_df.height/10)

combined_df.show()

combined_df = combined_df.withColumn("id", combined_df["id"].cast("integer"))

combined_df.printSchema()

In [None]:
#find lightest pokemon(s) in terms of weight

from pyspark.sql.functions import min

min_weight = combined_df.agg(min("weight").alias("min_weight")).collect()[0]["min_weight"]

min_weight_rows = combined_df.filter(combined_df["weight"] == min_weight)

rows = min_weight_rows.collect()

for row in rows:
    
    print("pokemon: {},weight(Kg):{}".format(row['name'],row['weight']))

In [None]:
#find heaviest pokemon(s) in terms of weight

from pyspark.sql.functions import max

max_weight = combined_df.agg(max("weight").alias("max_weight")).collect()[0]["max_weight"]

max_weight_row = combined_df.filter(combined_df["weight"] == max_weight)

rows = max_weight_row.collect()

for row in rows:
    
    print("pokemon: {}, weight(Kg):{}".format(row['name'],row['weight']))

In [None]:
#find tallest pokemon(s) in terms of height

from pyspark.sql.functions import max

max_height = combined_df.agg(max("height").alias("max_height")).collect()[0]["max_height"]

max_height_row = combined_df.filter(combined_df["height"] == max_height)

rows = max_height_row.collect()

for row in rows:
    
    print("pokemon: {}, height(metres):{}".format(row['name'],row['height']))

In [None]:
#list the 5 highest pokemon with base_experience recieved from defeating them

from pyspark.sql.functions import desc

combined_df.sort(desc('base_experience')).show(5)

In [None]:
# group pokemon by type 1
combined_df.groupBy(combined_df['Type 1']).agg({"*": "count"}).show()

# filter pokemon data for pokemon who have two types
filtered_types = combined_df.filter(combined_df['Type 2'].isNotNull())

filtered_types.show()

In [None]:
#write to s3 bucket
combined_df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("s3://nasrsolobucket/cleaned_pokemon")

In [None]:
#read pokemon_data.csv into spark df

berries_df = spark.read.csv('s3://nasrsolobucket/raw_berries/pokeBerry_data.csv',header=True,inferSchema=True)

In [None]:
berries_df =berries_df.drop('_c0')

berries_df.show(5)

In [None]:
#print berries schema

berries_df.printSchema()

In [None]:
# average growth time it takes for a tree to grow one stage in hours in hours 

berries_df.agg({"growth_time":"avg"}).show()

In [None]:
# max number of berries that can grow on one tree at a time

berries_df.agg({"max_harvest":"max"}).show()

In [None]:
#find berries with the biggest size
from pyspark.sql.functions import max

biggest_berries = berries_df.agg(max("size").alias("max_size")).collect()[0]["max_size"]

biggest_berries_rows = berries_df.filter(berries_df["size"] == biggest_berries)

rows = biggest_berries_rows.collect()

for row in rows:
    print("berries: {},size(millimetres):{}".format(row['name'],row['size']))

In [None]:
# get the berry or berries which dries out the quickest

max_df =berries_df.agg({"soil_dryness":"max"})

max_value_row = max_df.collect()[0]

max_value = max_value_row[0]

berries = berries_df.filter(berries_df['soil_dryness']==max_value)

print('berry which dries out the quickest')
berries.show()

In [None]:
#write berries data to s3 bucket
berries_df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("s3://nasrsolobucket/cleaned_berries")