In [0]:
# Install necessary libs
#%pip install tqdm

Collecting tqdm
  Obtaining dependency information for tqdm from https://files.pythonhosted.org/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl.metadata
  Using cached tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Using cached tqdm-4.67.1-py3-none-any.whl (78 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.67.1
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Importing necessary lubs
import time
import json
import requests
from tqdm import tqdm
from pyspark.sql import SparkSession
from multiprocessing import Pool, cpu_count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, BooleanType

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

In [0]:
# Function to fetch data
def fetch_data(url):
    response = requests.get(url).json()
    return {
        "id": response.get("id"),
        "name": response.get("name"),
        "base_experience": response.get("base_experience"),
        "height": response.get("height"),
        "weight": response.get("weight"),
        # Add other fields as needed
    }

# Example list of Pokémon URLs (replace with actual URLs)
urls = [
    "https://pokeapi.co/api/v2/pokemon/1/",
    "https://pokeapi.co/api/v2/pokemon/2/",
    "https://pokeapi.co/api/v2/pokemon/3/",
]

# Use tqdm for progress bar and multiprocessing to fetch data
with Pool(4) as p:
    pokemon_data = list(tqdm(p.imap(fetch_data, urls), total=len(urls), desc="Fetching Pokémon Data"))

# Define schema explicitly
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("base_experience", IntegerType(), True),
    StructField("height", IntegerType(), True),
    StructField("weight", IntegerType(), True),
    # Define other fields as needed
])

# Create Spark DataFrame with the defined schema
pokemon_df = spark.createDataFrame(pokemon_data, schema=schema)

# Display the DataFrame
pokemon_df.display()

Fetching Pokémon Data:   0%|          | 0/3 [00:00<?, ?it/s]Fetching Pokémon Data:  33%|███▎      | 1/3 [00:00<00:00,  6.24it/s]Fetching Pokémon Data:  67%|██████▋   | 2/3 [00:00<00:00,  4.17it/s]Fetching Pokémon Data: 100%|██████████| 3/3 [00:00<00:00,  6.55it/s]


id,name,base_experience,height,weight
1,bulbasaur,64,7,69
2,ivysaur,142,10,130
3,venusaur,263,20,1000


In [0]:
# Function to fetch data and normalize nested fields
def fetch_data(url):
    response = requests.get(url).json()

    # Normalize abilities, types, and stats
    abilities = [ability["ability"]["name"] for ability in response.get("abilities", [])]
    types = [t["type"]["name"] for t in response.get("types", [])]
    stats = {stat["stat"]["name"]: stat["base_stat"] for stat in response.get("stats", [])}

    return {
        "id": response.get("id"),
        "name": response.get("name"),
        "base_experience": response.get("base_experience"),
        "height": response.get("height"),
        "weight": response.get("weight"),
        "abilities": abilities,
        "types": types,
        "stats": stats,
    }

# Fetch all Pokémon URLs (paginated API)
def get_all_pokemon_urls():
    base_url = "https://pokeapi.co/api/v2/pokemon?limit=100"
    urls = []
    next_url = base_url

    while next_url:
        response = requests.get(next_url).json()
        urls.extend([pokemon["url"] for pokemon in response["results"]])
        next_url = response.get("next")  # Get the next page URL

    return urls

# Get all Pokémon URLs
urls = get_all_pokemon_urls()

# Use tqdm for progress bar and multiprocessing to fetch data
with Pool(4) as p:
    pokemon_data = list(tqdm(p.imap(fetch_data, urls), total=len(urls), desc="Fetching Pokémon Data"))

# Create Spark DataFrame with dynamically inferred schema
pokemon_df = spark.createDataFrame(pokemon_data)

# Reorder columns: id, name first, followed by others
ordered_columns = ["id", "name"] + [col for col in pokemon_df.columns if col not in ["id", "name"]]
pokemon_df = pokemon_df.select(ordered_columns)

# Print the reordered schema
print("Reordered schema of the Pokémon DataFrame:")
pokemon_df.printSchema()

# Display the DataFrame
pokemon_df.display()

Fetching Pokémon Data:   0%|          | 0/1302 [00:00<?, ?it/s]Fetching Pokémon Data:   0%|          | 1/1302 [00:00<04:20,  5.00it/s]Fetching Pokémon Data:   0%|          | 5/1302 [00:00<01:32, 14.07it/s]Fetching Pokémon Data:   1%|          | 7/1302 [00:00<01:56, 11.09it/s]Fetching Pokémon Data:   1%|          | 12/1302 [00:00<01:26, 14.90it/s]Fetching Pokémon Data:   1%|▏         | 17/1302 [00:01<01:23, 15.35it/s]Fetching Pokémon Data:   2%|▏         | 21/1302 [00:01<01:12, 17.63it/s]Fetching Pokémon Data:   2%|▏         | 23/1302 [00:01<01:16, 16.81it/s]Fetching Pokémon Data:   2%|▏         | 27/1302 [00:01<01:10, 18.14it/s]Fetching Pokémon Data:   2%|▏         | 29/1302 [00:01<01:09, 18.37it/s]Fetching Pokémon Data:   2%|▏         | 32/1302 [00:01<01:01, 20.65it/s]Fetching Pokémon Data:   3%|▎         | 35/1302 [00:02<01:01, 20.71it/s]Fetching Pokémon Data:   3%|▎         | 38/1302 [00:02<01:05, 19.35it/s]Fetching Pokémon Data:   3%|▎         | 41/1302 [00:02<01:04, 

Reordered schema of the Pokémon DataFrame:
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- abilities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- base_experience: long (nullable = true)
 |-- height: long (nullable = true)
 |-- stats: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- types: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- weight: long (nullable = true)



id,name,abilities,base_experience,height,stats,types,weight
1,bulbasaur,"List(overgrow, chlorophyll)",64,7,"Map(special-defense -> 65, defense -> 49, special-attack -> 65, attack -> 49, hp -> 45, speed -> 45)","List(grass, poison)",69
2,ivysaur,"List(overgrow, chlorophyll)",142,10,"Map(special-defense -> 80, defense -> 63, special-attack -> 80, attack -> 62, hp -> 60, speed -> 60)","List(grass, poison)",130
3,venusaur,"List(overgrow, chlorophyll)",263,20,"Map(special-defense -> 100, defense -> 83, special-attack -> 100, attack -> 82, hp -> 80, speed -> 80)","List(grass, poison)",1000
4,charmander,"List(blaze, solar-power)",62,6,"Map(special-defense -> 50, defense -> 43, special-attack -> 60, attack -> 52, hp -> 39, speed -> 65)",List(fire),85
5,charmeleon,"List(blaze, solar-power)",142,11,"Map(special-defense -> 65, defense -> 58, special-attack -> 80, attack -> 64, hp -> 58, speed -> 80)",List(fire),190
6,charizard,"List(blaze, solar-power)",267,17,"Map(special-defense -> 85, defense -> 78, special-attack -> 109, attack -> 84, hp -> 78, speed -> 100)","List(fire, flying)",905
7,squirtle,"List(torrent, rain-dish)",63,5,"Map(special-defense -> 64, defense -> 65, special-attack -> 50, attack -> 48, hp -> 44, speed -> 43)",List(water),90
8,wartortle,"List(torrent, rain-dish)",142,10,"Map(special-defense -> 80, defense -> 80, special-attack -> 65, attack -> 63, hp -> 59, speed -> 58)",List(water),225
9,blastoise,"List(torrent, rain-dish)",265,16,"Map(special-defense -> 105, defense -> 100, special-attack -> 85, attack -> 83, hp -> 79, speed -> 78)",List(water),855
10,caterpie,"List(shield-dust, run-away)",39,3,"Map(special-defense -> 20, defense -> 35, special-attack -> 20, attack -> 30, hp -> 45, speed -> 45)",List(bug),29
