In [14]:
import os
import requests
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta

start = datetime(2024, 1, 1)
end = datetime.today()

# Create a list of dates
date_list = []
current_date = start

while current_date <= end:
    date_list.append(current_date)
    current_date += timedelta(days=1)

url_list = [
    f"""https://dumps.wikimedia.org/enwiki/{date.strftime("%Y%m%d")}/dumpstatus.json"""
    for date in date_list
]

if not os.path.exists("enwiki_dumpstatus_metadata"):
    os.makedirs("enwiki_dumpstatus_metadata")


def download_metadata(url: str):
    url_stub = url.split("/")
    file_name = os.path.join(
        "enwiki_dumpstatus_metadata", f"{url_stub[-3]}_{url_stub[-2]}_{url_stub[-1]}"
    )
    headers = {"User-Agent": "SimpleGetMetadataBot/0.0"}
    response = requests.get(url, headers=headers, stream=True)
    if response.status_code == 404:
        pass
    else:
        with open(file_name, "wb") as file:
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)


with ThreadPoolExecutor(max_workers=12) as executor:
    futures = [executor.submit(download_metadata, url) for url in url_list]
    for future in futures:
        future.result()

In [19]:
with open("enwiki_dumpstatus_metadata.json", "w") as write_file:
    for i in os.listdir("enwiki_dumpstatus_metadata"):
        with open(os.path.join("enwiki_dumpstatus_metadata", i), "r") as read_file:
            write_file.write(read_file.read())

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import (
    StructField,
    StructType,
    IntegerType,
    FloatType,
    StringType,
    ArrayType,
    MapType,
)

spark = SparkSession.builder.master("local[*]").getOrCreate()

ndjson_path = "data/enwiki_dumpstatus_metadata.json"
df = spark.read.option("multiline", "true").text(ndjson_path)

your 131072x1 screen size is bogus. expect trouble
24/09/30 02:57:00 WARN Utils: Your hostname, EdenHimmel1316 resolves to a loopback address: 127.0.1.1; using 172.25.138.88 instead (on interface eth0)
24/09/30 02:57:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/30 02:57:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df.show()

+--------------------+
|               value|
+--------------------+
|{"jobs": {"xmlstu...|
|{"jobs": {"xmlstu...|
|{"jobs": {"xmlstu...|
|{"jobs": {"xmlstu...|
|{"jobs": {"xmlstu...|
|{"jobs": {"xmlstu...|
|{"jobs": {"xmlstu...|
+--------------------+



In [3]:
df1 = df.select(
    F.monotonically_increasing_id().alias("id"),
    F.from_json(
        "value",
        schema=MapType(StringType(), StringType())
    ).alias("col")
)

In [4]:
df1.show()

+---+--------------------+
| id|                 col|
+---+--------------------+
|  0|{jobs -> {"xmlstu...|
|  1|{jobs -> {"xmlstu...|
|  2|{jobs -> {"xmlstu...|
|  3|{jobs -> {"xmlstu...|
|  4|{jobs -> {"xmlstu...|
|  5|{jobs -> {"xmlstu...|
|  6|{jobs -> {"xmlstu...|
+---+--------------------+



In [7]:
df1 = df1.select("id", F.explode_outer("col"))

In [8]:
df1.show()

+---+-------+--------------------+
| id|    key|               value|
+---+-------+--------------------+
|  0|   jobs|{"xmlstubsdumprec...|
|  0|version|                 0.8|
|  1|   jobs|{"xmlstubsdumprec...|
|  1|version|                 0.8|
|  2|   jobs|{"xmlstubsdumprec...|
|  2|version|                 0.8|
|  3|   jobs|{"xmlstubsdumprec...|
|  3|version|                 0.8|
|  4|   jobs|{"xmlstubsdumprec...|
|  4|version|                 0.8|
|  5|   jobs|{"xmlstubsdumprec...|
|  5|version|                 0.8|
|  6|   jobs|{"xmlstubsdumprec...|
|  6|version|                 0.8|
+---+-------+--------------------+



In [16]:
pivot_df1 = df1.groupBy("id").pivot("key").agg(F.first("value"))

In [10]:
pivot_df1.show()

+---+--------------------+-------+
| id|                jobs|version|
+---+--------------------+-------+
|  0|{"xmlstubsdumprec...|    0.8|
|  1|{"xmlstubsdumprec...|    0.8|
|  2|{"xmlstubsdumprec...|    0.8|
|  3|{"xmlstubsdumprec...|    0.8|
|  4|{"xmlstubsdumprec...|    0.8|
|  5|{"xmlstubsdumprec...|    0.8|
|  6|{"xmlstubsdumprec...|    0.8|
+---+--------------------+-------+



In [41]:
df2 = pivot_df1.select(
    "id",
    "version",
    F.from_json("jobs", schema=MapType(StringType(), StringType())).alias("col"),
)

pivot_df2 = (
    df2.select("id", "version", F.explode_outer("col").alias("dump_group", "dump_info"))
)

# df4 = df3.select("id", F.explode_outer("col")).groupBy("id").pivot("key").agg(F.rtrim(F.first("value")))

In [38]:
pivot_df2.show(truncate=False)

+---+-------+------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [28]:
import ast
from pyspark.sql.functions import udf

@udf(MapType(StringType(), StringType()))
def parse_line(line):
    data = ast.literal_eval(line)
    return data

In [65]:
df3 = (
    (
        pivot_df2.select(
            "id",
            "version",
            "dump_group",
            F.explode(
                F.from_json(
                    "dump_info", schema=MapType(StringType(), StringType())
                ).alias("dump_info")
            ),
        )
    )
    .groupBy("id", "version", "dump_group")
    
    .pivot("key")
    .agg(F.trim(F.first("value")))
).drop("id")

In [66]:
df3.show()

+-------+--------------------+--------------------+------+-------------------+
|version|          dump_group|               files|status|            updated|
+-------+--------------------+--------------------+------+-------------------+
|    0.8|       abstractsdump|{"enwiki-20240720...|  done|2024-07-23 05:29:01|
|    0.8|abstractsdumpreco...|{"enwiki-20240720...|  done|2024-07-23 05:29:57|
|    0.8|   allpagetitlesdump|{"enwiki-20240720...|  done|2024-07-22 18:28:43|
|    0.8|        articlesdump|{"enwiki-20240720...|  done|2024-07-22 10:50:58|
|    0.8|articlesdumprecom...|{"enwiki-20240720...|  done|2024-07-22 12:24:48|
|    0.8|articlesmultistre...|{"enwiki-20240720...|  done|2024-07-22 13:04:25|
|    0.8|articlesmultistre...|{"enwiki-20240720...|  done|2024-07-22 13:34:45|
|    0.8|          babeltable|{"enwiki-20240720...|  done|2024-07-20 10:26:44|
|    0.8|  categorylinkstable|{"enwiki-20240720...|  done|2024-07-20 08:47:50|
|    0.8|       categorytable|{"enwiki-20240720...| 

In [62]:
df4.orderBy(F.col("updated").desc()).show()

+-------+--------------------+--------------------+------+-------------------+
|version|          dump_group|               files|status|            updated|
+-------+--------------------+--------------------+------+-------------------+
|    0.8|articlesmultistre...|{"enwiki-20240920...|  done|2024-09-28 00:25:33|
|    0.8|articlesmultistre...|{"enwiki-20240920...|  done|2024-09-28 00:10:01|
|    0.8|xmlpagelogsdumpre...|{"enwiki-20240920...|  done|2024-09-27 23:49:39|
|    0.8|     xmlpagelogsdump|{"enwiki-20240920...|  done|2024-09-27 23:46:12|
|    0.8|metacurrentdumpre...|{"enwiki-20240920...|  done|2024-09-27 23:02:53|
|    0.8|     metacurrentdump|{"enwiki-20240920...|  done|2024-09-27 20:29:21|
|    0.8|articlesdumprecom...|{"enwiki-20240920...|  done|2024-09-27 19:16:58|
|    0.8|        articlesdump|{"enwiki-20240920...|  done|2024-09-27 17:59:51|
|    0.8|xmlstubsdumprecom...|{"enwiki-20240920...|  done|2024-09-27 16:37:28|
|    0.8|        xmlstubsdump|{"enwiki-20240920...| 

In [63]:
import pendulum
previous_creation_day = pendulum.today(tz="Asia/Ho_Chi_Minh") - pendulum.duration(days=1)