<a href="https://colab.research.google.com/github/DenysNunes/data-examples/blob/main/spark/4%20-%20user%20cases/wows_data_case.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Lakehouse: World of Warships API

In this case, I'm used a public api from WOWS game to create a mini example of 
lakehouse. <br>
Therefore, in the first part of this tutorial, I'm getting data from the API using a single requests methods. <br>
Next, all data is saved in a AWS s3 bucket with a layers pattern of lakehouse building.

▶ Observation:

* This is a simple api crawler, a better design could be deployed on a k8s cluster or using a specialized tool like Airbyte.

In [1]:
!pip uninstall -y datascience
!pip install -q pyspark==3.2.0
!pip install -q boto3==1.20.37
!pip install -q awscli==1.22.37

Found existing installation: datascience 0.10.6
Uninstalling datascience-0.10.6:
  Successfully uninstalled datascience-0.10.6
[K     |████████████████████████████████| 281.3 MB 41 kB/s 
[K     |████████████████████████████████| 198 kB 55.7 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 131 kB 4.2 MB/s 
[K     |████████████████████████████████| 79 kB 7.2 MB/s 
[K     |████████████████████████████████| 8.5 MB 33.1 MB/s 
[K     |████████████████████████████████| 138 kB 70.8 MB/s 
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
requests 2.23.0 requires urllib3!=1.25.0,!=1.25.1,<1.26,>=1.21.1, but you have urllib3 1.26.8 which is incompatible.[0m
[K     |████████████████████████████████| 3.8 MB 3.7 MB/s 
[K     |████████████████████████████████| 547 kB 44.3 MB/s 
[K     |████████████

## ▶ Introducing

### ▶ Credentials input

AWS Account credentials and bucket configuration.

In [None]:
aws_access_key = input('Tip you AWS Access Key:')
aws_secret_key = input('Tip you AWS Secret Key:')
aws_bucket_work = input('Tip you bucket (Eg. my-bucket-teste):')

### ▶ Application initializing

* Imports
* Setting env variables
* Init spark session
* Get data in wows api.

In [3]:
import os
import pytz
import time
import json
import boto3
import requests

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name

partition = datetime.now().strftime('%Y-%m-%d')

bronze_key = f'wows/bronze/'
silver_path = f"s3a://{aws_bucket_work}/wows/silver/"
gold_path = f"s3a://{aws_bucket_work}/wows/gold/"

os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_key
os.environ["DATALAKE_BUCKET"] = aws_bucket_work

s3 = boto3.resource('s3')

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("World of Warships Lakehouse APP") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0,org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk:1.12.141") \
    .enableHiveSupport() \
    .getOrCreate()



In [4]:
def send_request(url: str, payload: dict = {}, method: str = "get"):
    if method == "get":
        r = requests.get(url, params=payload)
        return r.json()

tz = "Brazil/East"
server_timezone = pytz.timezone(tz)
server_time = datetime.now(server_timezone)

# Clan - ID
clan_names = ["Diversão Brasil", 
              "Star Brasil", 
              "MARINHA DO BRASIL", 
              "FORÇA EXPEDICIONÁRIA BRASILEIRA", 
              "Brasil",
              "BRASIL ACIMA DE TUDO",
              "Brasil Império dos Mares"]
              
clan_p = [{"application_id": "d34126e87e37087d0337e9365f839ba3",  
           "search": x, 
           "fields": 
           "clan_id,name,tag"} for x in clan_names ]
clan_url = "http://api.worldofwarships.com/wows/clans/list/"

clans = [send_request(clan_url, x) for x in clan_p]
clans = [(x['data'][0]['name'], x['data'][0]['clan_id']) for x in clans]

# Clan Details - Members 
clan_details_url = "http://api.worldofwarships.com/wows/clans/info/"

players = []

player_p = {"application_id": "d34126e87e37087d0337e9365f839ba3"}
player_url = "http://api.worldofwarships.com/wows/account/info/"


for cl in clans:
    clan_name, clan_id = cl
    clan_details_p = {"application_id": 
                      "d34126e87e37087d0337e9365f839ba3",  
                      "clan_id": clan_id, 
                      "fields": "members_ids"}    

    clan_details = send_request(clan_details_url, 
                                clan_details_p)['data'][str(clan_id)]
    clan_members = clan_details['members_ids']
    for m in clan_members:
      attributes = ['last_battle_time', 'account_id']

      pnp_copy = player_p.copy()
      pnp_copy['account_id'] = m
      player_details = send_request(player_url, pnp_copy)['data']
      
      data = player_details[str(m)]
      data['account_id'] = m
      data['clan_id'] = clan_id
      data['clan_name'] = clan_name
      players.append(data)

### ▶ Bucket Cleansing

Clean all wows/ key

In [5]:
!aws s3 rm s3://$DATALAKE_BUCKET/wows/ --recursive --quiet

## ▶ Layers

### ▶ Bronze Layer

Is a good pattern create a bronze layer with a partition to split data and enable a day based reprocessing feature.

In [6]:
bronze_file = s3.Object(aws_bucket_work, bronze_key + f"tb_players/bronze_dt_exec={partition}/data.json")
bronze_file.put(Body=bytes(json.dumps(players), encoding='utf-8'))

{'ETag': '"1fddec0ce192f8720f7dec73947376b9"',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '0',
   'date': 'Thu, 20 Jan 2022 04:00:30 GMT',
   'etag': '"1fddec0ce192f8720f7dec73947376b9"',
   'server': 'AmazonS3',
   'x-amz-id-2': 'JfwQOjhKebtKxOff47IsQz5XUpcnZ39SjGzbwqbIjec2ltYQCoOPD6QUOo7zBW68hAK2qSXytDA=',
   'x-amz-request-id': 'T8S6NM6Y67NX58H6'},
  'HTTPStatusCode': 200,
  'HostId': 'JfwQOjhKebtKxOff47IsQz5XUpcnZ39SjGzbwqbIjec2ltYQCoOPD6QUOo7zBW68hAK2qSXytDA=',
  'RequestId': 'T8S6NM6Y67NX58H6',
  'RetryAttempts': 1}}

#### ▶ Verifying bronze data

Querying data from s3

In [7]:
df_bronze = spark.read.json(f's3a://{aws_bucket_work}/{bronze_key}/tb_players/')

# Another good practice is put the name of file source for future debug
df_bronze = df_bronze.withColumn("bronze_input_file", input_file_name())

df_bronze.createOrReplaceTempView('temp_tb_players_bronze')
df_bronze.show()

+----------+----------+---------------+----------+--------------+-----+----------------+---------------+-------------+----------+-----------------+-------+--------------------+----------------+----------+--------------+--------------------+
|account_id|   clan_id|      clan_name|created_at|hidden_profile|karma|last_battle_time|leveling_points|leveling_tier| logout_at|         nickname|private|          statistics|stats_updated_at|updated_at|bronze_dt_exec|   bronze_input_file|
+----------+----------+---------------+----------+--------------+-----+----------------+---------------+-------------+----------+-----------------+-------+--------------------+----------------+----------+--------------+--------------------+
|1000275761|1000075507|Diversão Brasil|1438905943|         false| null|      1642632995|           9433|           15|1642636617|        rogerio19|   null|{9208, 300780, {{...|      1642636632|1642636632|    2022-01-20|s3a://denys-data-...|
|1009995203|1000075507|Diversão Bras

### ▶ Silver Layer

Following the best practices of lakehouse building, Silver layer is a section of data  most augmented and performatic. <br>
Some transformations and optimizations could be done here.


In [8]:
from delta.tables import DeltaTable

# if delta table already exists, a merge operation will be performed
# if not, a one will be created
def create_or_merge_table(source_df, merge_mapping, table_path, partition):
  if DeltaTable.isDeltaTable(spark, table_path):
    target_delta = DeltaTable.forPath(spark, table_path)
    target_delta.alias("target") \
                   .merge(
                       source_df.alias("source"), 
                       merge_mapping
                   ) \
                   .whenMatchedUpdateAll() \
                   .whenNotMatchedInsertAll() \
                   .execute()
  else:
    source_df.write \
             .format("delta") \
              .save(table_path, 
                      partitionBy=partition)



# Creating a main silver
df_silver_main = spark.sql(f"""

SELECT account_id, 
       clan_id, 
       clan_name, 
       created_at, 
       hidden_profile, 
       karma, 
       last_battle_time, 
       leveling_points, 
       leveling_tier, 
       logout_at, 
       nickname, 
       private, 
       stats_updated_at, 
       updated_at, 
       bronze_dt_exec, 
       bronze_input_file,
       current_date() as silver_dt_exec 
FROM temp_tb_players_bronze
WHERE bronze_dt_exec = CAST('{partition}' AS DATE)

""").repartition(4)

df_silver_main = df_silver_main.repartition(4)
create_or_merge_table(df_silver_main, 
                      "target.account_id = source.account_id",
                      f'{silver_path}tb_players_default/',
                      ["clan_id"])

# Creating another silver with statistics table
# But, is good to remember that`s can vary with you gold needs
# The main objective of silver layer is to be a performatic interface for gold tables

df_silver_stats = spark.sql(f"""

SELECT account_id, 
       clan_id,
       clan_name,
       statistics.distance,
       statistics.pvp.*,
       bronze_dt_exec,
       bronze_input_file,
       current_date() as silver_dt_exec 
FROM  temp_tb_players_bronze
WHERE bronze_dt_exec = CAST('{partition}' AS DATE)

""").repartition(4)

create_or_merge_table(df_silver_stats, 
                      "target.account_id = source.account_id",
                      f'{silver_path}tb_players_statistics/',
                      ["clan_id"])

### ▶ Gold Layer

This table is an agg to create a view about loss and win rates grouped by clan_id (already partitioned in SILVER to optimization of gold read).

In [12]:
df_gold_rates = spark.sql(f"""

SELECT clan_id,
       clan_name, 
       sum(battles) as battles, 
       sum(losses) as losses,
       sum(wins) as wins,
       format_number(sum(losses) * 100 / sum(battles), 4) as loss_rate,
       format_number(sum(wins) * 100 / sum(battles), 4) as win_rate
FROM DELTA.`{silver_path}tb_players_statistics/` 
GROUP BY clan_id, clan_name

""")

# This gold has few data, don't need partition
df_gold_rates = df_gold_rates.repartition(4)
create_or_merge_table(df_gold_rates, 
                      "target.clan_name = source.clan_name",
                      f'{gold_path}tb_clan_rates/',
                      None)

In [13]:
spark.read.load(f'{gold_path}tb_clan_rates/', format='delta') \
          .show(200, False)

+----------+---------------------------------------+-------+------+------+---------+--------+
|clan_id   |clan_name                              |battles|losses|wins  |loss_rate|win_rate|
+----------+---------------------------------------+-------+------+------+---------+--------+
|1000072137|BRASIL ACIMA DE TUDO                   |2231   |1157  |1057  |51.8602  |47.3779 |
|1000044402|Brasil                                 |70658  |36432 |33992 |51.5610  |48.1078 |
|1000060544|Brasil Império dos Mares               |8391   |4146  |4234  |49.4101  |50.4588 |
|1000075507|Diversão Brasil                        |233750 |124021|109526|53.0571  |46.8560 |
|1000052530|MARINHA DO BRASIL                      |285570 |146632|138693|51.3471  |48.5671 |
|1000044019|Star Brasil                            |405769 |176619|228740|43.5270  |56.3720 |
|1000066881|✠♕♔ FORÇA EXPEDICIONÁRIA BRASILEIRA ♔♕✠|144279 |74990 |69203 |51.9757  |47.9647 |
+----------+---------------------------------------+-------+

In [14]:
!aws s3 ls $DATALAKE_BUCKET --recursive

2022-01-20 04:00:30     463138 wows/bronze/tb_players/bronze_dt_exec=2022-01-20/data.json
2022-01-20 04:03:23       1811 wows/gold/tb_clan_rates/_delta_log/00000000000000000000.json
2022-01-20 04:08:39       1644 wows/gold/tb_clan_rates/_delta_log/00000000000000000001.json
2022-01-20 04:03:17       2113 wows/gold/tb_clan_rates/part-00000-3644ad3d-287b-4f1d-bd27-20c3628abfc6-c000.snappy.parquet
2022-01-20 04:08:36       2571 wows/gold/tb_clan_rates/part-00000-e5e90ae5-28e1-44dc-9a56-117cc202fa22-c000.snappy.parquet
2022-01-20 04:03:17       2268 wows/gold/tb_clan_rates/part-00001-448f1dc7-b049-4130-b7be-505173b141be-c000.snappy.parquet
2022-01-20 04:03:19       2151 wows/gold/tb_clan_rates/part-00002-0480b9cc-a871-4882-a416-c1dfafa6eff0-c000.snappy.parquet
2022-01-20 04:03:19       2098 wows/gold/tb_clan_rates/part-00003-1681af3d-3cac-4da8-a9db-bf454b8b5b49-c000.snappy.parquet
2022-01-20 04:01:29       7707 wows/silver/tb_players_default/_delta_log/00000000000000000000.json
2022-01-20 0

#### ▶ Describing Gold

I runned gold process twice to test merge feature and visualize table version

In [15]:
spark.sql(f"""

DESCRIBE HISTORY delta.`{gold_path}tb_clan_rates/`

""").show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      1|2022-01-20 04:08:39|  null|    null|    MERGE|{predicate -> (ta...|null|    null|     null|          0|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.2....|
|      0|2022-01-20 04:03:23|  null|    null|    WRITE|{mode -> ErrorIfE...|null|    null|     null|       null|  Serializable|         true|{numFiles -> 4, n...|        null|Apache-Spark/3.2....|
+-------+------