In [0]:
storage_account = 'wikidatasubset'

spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    dbutils.secrets.get(scope="databricks-secret-keys", key="blob")
)

In [0]:
from pyspark.sql.types import MapType, ArrayType, StructType, StructField, StringType

labels_struct = StructType([
    StructField("language", StringType(), True),
    StructField("value", StringType(), True)
])

labels_map = MapType(StringType(), labels_struct)

labels_schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("labels", labels_map, True)
])

In [0]:
container = 'init'
path = 'wikidatawiki/entities/latest-all.json'

df_label = spark.read.json(f'abfss://{container}@{storage_account}.dfs.core.windows.net/{path}', schema=labels_schema)

In [0]:
from pyspark.sql.functions import explode, col

df_label_final = (
    df_label.where(col("type") == 'item').select("id", explode("labels").alias("language", "struct"))
    .select("id", "language", col("struct.value").alias("label"))
)

In [0]:
df_label_final.write.format('parquet').save(f'abfss://{container}@{storage_account}.dfs.core.windows.net/preprocessed/aliases')

In [0]:
from pyspark.sql.types import MapType, ArrayType, StructType, StructField, StringType

value_struct = StructType([
    StructField("id", StringType(), True)
])

datavalue_struct = StructType([
    StructField("value", value_struct, True)
])

mainsnak_struct = StructType([
    StructField("datavalue", datavalue_struct, True)
])

claims_struct = StructType([
    StructField("mainsnak", mainsnak_struct, True),
])

claims_map = MapType(StringType(), ArrayType(claims_struct))

claims_schema = StructType([
    StructField("id", StringType(), True),
    StructField("claims", claims_map, True)
])

In [0]:
container = 'init'
path = 'wikidatawiki/entities/latest-all.json'

df_claims = spark.read.json(f'abfss://{container}@{storage_account}.dfs.core.windows.net/{path}', schema=claims_schema)

In [0]:
from pyspark.sql.functions import explode, col, explode_outer

df_claims_final = (
    df_claims.select("id", explode("claims").alias("claim", "claim_datas"))
    .select("id", "claim", explode("claim_datas").alias("claim_data"))
    .select(col("id").alias("src"), col("claim_data.mainsnak.datavalue.value.id").alias("dst"), col("claim").alias("property"))
)

In [0]:
df_claims_final.write.format('parquet').save(f'abfss://{container}@{storage_account}.dfs.core.windows.net/preprocessed/claims')

In [0]:
df_claims_final_read = spark.read.parquet(f'abfss://{container}@{storage_account}.dfs.core.windows.net/preprocessed/claims')