In [0]:
import requests
from typing import Dict, Tuple
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from datetime import datetime

gcs_bucket = "gs://dotabricks-felipedmnq" # datalake 
gcs_promatches_hist_blob = f"{gcs_bucket}/raw/pro_matches_history"
pro_matches_endpoint = "https://api.opendota.com/api/proMatches" # last 100 matches

In [0]:
# dbutils.fs.mkdirs(f"{gcs_bucket}/raw")

In [0]:
# repartition - agg partitions to one unique partition - useful because our file is not big.
# coalesce works in the same way?


## REPARTITION returns a DF with exactly N partitions - WIDE transformation - SHUFFLE THE DATA - EVENLY BALANCED partition sizes.
# df.repartition(1).write.format("parquet").mode("append").save(gcs_promatches_blob)

## COALESCE returns a DF with exactly N partitions - NARROW transformation - NO SHUFFLE - NOT ABLE TO INCREASE PARTITIONS
# df.coalesce(1).write.format("parquet").mode("append").save(gcs_promatches_blob)

In [0]:
def get_data(url: str, **kwargs) -> Dict[str, any]:
    """Update url if needed. Get data from api as url specifications""" 
    
    if kwargs:
        params = "&".join([f"{k}={v}" for k, v in kwargs.items()])
        url += f"?{params}"
    
    response = requests.get(url)
    return response.json()

def get_min_match_id(df: DataFrame) -> int: 
    """return the oldest match from datalake files"""

    min_match_id = df.agg(F.min("match_id")).collect()[0][0]
    return min_match_id

def get_max_date(df: DataFrame) -> str:
    max_date = df.withColumn(
        "match_date", F.from_unixtime("start_time")
    ).agg(F.date_add(F.max(F.col("match_date")), -1)).collect()[0][0]
    
    return max_date

def get_min_date(df: DataFrame) -> str:
    min_date = df.withColumn(
        "match_date", F.from_unixtime("start_time")
    ).agg(F.date_add(F.min(F.col("match_date")), -1)).collect()[0][0]
    
    return min_date

def get_current_data_from_datalake() -> DataFrame:
    """Get current data from parquet files in Cloud GCS bucket"""
    
    current_data_from_blob = spark.read.format("parquet").load(gcs_promatches_hist_blob)
    return current_data_from_blob
    
def save_data_to_hist_blob(df: DataFrame) -> None:
    """Append data to datalake pro_matches_history file"""
    
    df.coalesce(1).write.format("parquet").mode("append").save(gcs_promatches_hist_blob)
    
def get_and_save(min_match_id: int) -> Tuple[DataFrame, int]:
    """"""
    
    new_data = get_data(pro_matches_endpoint, less_than_match_id=min_match_id)
    df_new_data = spark.createDataFrame(new_data)
    min_match_id = get_min_match_id(df_new_data)
    save_data_to_hist_blob(df_new_data)
    
    return df_new_data, min_match_id
    

In [0]:
def get_history_pro_matches() -> Tuple[DataFrame, int]:
    """Get raw data from andpoint based on last match_id from current data."""
    
    df = get_current_data_from_datalake()
    min_match_id = get_min_match_id(df)
#     data = get_data(pro_matches_endpoint, less_than_match_id=min_match_id)
#     new_data = get_and_save(min_match_id)
#     
#     min_match_id = get_min_match_id(df_new_data)
    
    while min_match_id is not None:
        print(min_match_id)
        try:
            _, min_match_id = get_and_save(min_match_id)
        except Exception as e:
            print(e)
            break

In [0]:
def get_new_pro_matches():
    df_from_lake = get_current_data_from_datalake()
    min_match_id = get_min_match_id(df_from_lake)
    max_date = get_max_date(df)
    min_date = get_min_date(df)
    processing_date = datetime.now().date()
    print(f"{max_date} - {processing_date} - {min_date}")
    
    while min_date < max_date:
        try:
            df_new_pro_matches, min_match_id = get_and_save(min_match_id)
            print(min_match_id)
            print(f"PROCESSING DATE: {processing_date}\nMAX DATE: {max_date}\nMIN_DATE: {min_date}")
        except Exception as e:
            print(e)
            break

In [0]:
get_new_pro_matches()

2022-12-17 - 2022-12-19 - 2022-07-22
6673145130
PROCESSING DATE: 2022-12-19
MAX DATE: 2022-12-17
MIN_DATE: 2022-07-22
6671896517
PROCESSING DATE: 2022-12-19
MAX DATE: 2022-12-17
MIN_DATE: 2022-07-22
6671183817
PROCESSING DATE: 2022-12-19
MAX DATE: 2022-12-17
MIN_DATE: 2022-07-22
6670082453
PROCESSING DATE: 2022-12-19
MAX DATE: 2022-12-17
MIN_DATE: 2022-07-22
6668608414
PROCESSING DATE: 2022-12-19
MAX DATE: 2022-12-17
MIN_DATE: 2022-07-22
6667544851
PROCESSING DATE: 2022-12-19
MAX DATE: 2022-12-17
MIN_DATE: 2022-07-22
6665821781
PROCESSING DATE: 2022-12-19
MAX DATE: 2022-12-17
MIN_DATE: 2022-07-22


In [0]:
df = get_current_data_from_datalake()

In [0]:
df.groupBy().agg(F.countDistinct("match_id"), F.count("match_id")).display()

count(match_id),count(match_id).1
9900,10000


In [0]:
min_match_id = get_and_save()

while min_match_id:
    min_match_id = get_and_save()
    print(min_match_id)

6841584956
6837648091


In [0]:
df = get_current_data_from_datalake()
display(df.count())
display(df.agg(F.countDistinct("match_id")))

2900

count(match_id)
2800


In [0]:
spark.read.format("parquet").load(gcs_promatches_hist_blob).createOrReplaceTempView("current_raw_pro_matches")

In [0]:
%sql

SELECT DATE_ADD(FROM_UNIXTIME(MAX(start_time)), -1) FROM current_raw_pro_matches

"date_add(from_unixtime(max(start_time), yyyy-MM-dd HH:mm:ss), -1)"
2022-12-17


In [0]:
type(df.withColumn("match_date", F.from_unixtime("start_time")).agg(F.date_add(F.max(F.col("match_date")), -1)).collect()[0][0])

Out[34]: datetime.date

In [0]:
df.withColumn("match_date", F.from_unixtime("start_time")).agg(F.date_add(F.max(F.col("match_date")), -1)).collect()[0][0]

Out[40]: datetime.date(2022, 12, 17)