In [4]:
import dtale

from pushcart.metadata import Metadata
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.getOrCreate()

df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("escape", '"')
    .load("./dataset.csv")
)

print(df.schema)
df.show()


23/07/28 17:16:25 WARN Utils: Your hostname, laptop resolves to a loopback address: 127.0.1.1; using 192.168.2.13 instead (on interface enp0s20f0u1u4u4)
23/07/28 17:16:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/28 17:16:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


StructType([StructField('id', StringType(), True), StructField('ts', StringType(), True), StructField('payload', StringType(), True)])
+---+--------------------+--------------------+
| id|                  ts|             payload|
+---+--------------------+--------------------+
|  0|2023-07-13T17:26:...|{"current_page": ...|
|  1|2023-07-13T17:27:...|{"current_page": ...|
|  2|2023-07-13T17:27:...|{"current_page": ...|
|  3|2023-07-13T17:27:...|{"current_page": ...|
|  4|2023-07-13T17:27:...|{"current_page": ...|
|  5|2023-07-13T17:27:...|{"current_page": ...|
|  6|2023-07-13T17:27:...|{"current_page": ...|
|  7|2023-07-13T17:27:...|{"current_page": ...|
|  8|2023-07-13T17:27:...|{"current_page": ...|
+---+--------------------+--------------------+



In [3]:
md = Metadata(df, infer_fraction=1.0)
# md.get_metadata()

23/07/28 17:16:44 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [None]:
# Edits are reflected in the underlying dataset
md.metadata_df

In [None]:
code = md.generate_code()

In [None]:
import pyspark.sql.functions as F

processed_df = (
    df.withColumn("id", F.col("id").cast("int"))
    .withColumn("ts", F.to_timestamp(F.col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))
    .withColumn(
        "payload",
        F.from_json(
            F.col("payload"),
            schema="struct<current_page:bigint,data:array<struct<fact:string,length:bigint>>,first_page_url:string,from:bigint,last_page:bigint,last_page_url:string,links:array<struct<active:boolean,label:string,url:string>>,next_page_url:string,path:string,per_page:bigint,prev_page_url:string,to:bigint,total:bigint>",
        ),
    )
    .select(["id", "ts", "payload"])
)

print(processed_df.schema)
processed_df.show()


In [None]:
nested_md = Metadata(processed_df, infer_fraction=1.0)
nested_md.get_metadata()

In [None]:
flat_code = nested_md.generate_code()

In [None]:
flattened_df = (
    processed_df.withColumn(
        "payload_current_page", F.col("payload.current_page").cast("bigint")
    )
    .withColumn("payload_data", F.explode("payload.data"))
    .withColumn("payload_data_fact", F.col("payload_data.fact").cast("string"))
    .withColumn("payload_data_length", F.col("payload_data.length").cast("bigint"))
    .withColumn(
        "payload_first_page_url", F.col("payload.first_page_url").cast("string")
    )
    .withColumn("payload_from", F.col("payload.from").cast("bigint"))
    .withColumn("payload_last_page", F.col("payload.last_page").cast("bigint"))
    .withColumn("payload_last_page_url", F.col("payload.last_page_url").cast("string"))
    .withColumn("payload_links", F.explode("payload.links"))
    .withColumn("payload_links_active", F.col("payload_links.active").cast("boolean"))
    .withColumn("payload_links_label", F.col("payload_links.label").cast("string"))
    .withColumn("payload_links_url", F.col("payload_links.url").cast("string"))
    .withColumn("payload_next_page_url", F.col("payload.next_page_url").cast("string"))
    .withColumn("payload_path", F.col("payload.path").cast("string"))
    .withColumn("payload_per_page", F.col("payload.per_page").cast("bigint"))
    .withColumn("payload_prev_page_url", F.col("payload.prev_page_url").cast("string"))
    .withColumn("payload_to", F.col("payload.to").cast("bigint"))
    .withColumn("payload_total", F.col("payload.total").cast("bigint"))
    .select(
        [
            "id",
            "ts",
            "payload_current_page",
            "payload_data",
            "payload_data_fact",
            "payload_data_length",
            "payload_first_page_url",
            "payload_from",
            "payload_last_page",
            "payload_last_page_url",
            "payload_links",
            "payload_links_active",
            "payload_links_label",
            "payload_links_url",
            "payload_next_page_url",
            "payload_path",
            "payload_per_page",
            "payload_prev_page_url",
            "payload_to",
            "payload_total",
        ]
    )
)

flattened_df.show()
