Import files

In [0]:
calendar = spark.read.csv("dbfs:/FileStore/predictive/calendar.csv", header = True)
sell_prices = spark.read.csv("dbfs:/FileStore/predictive/sell_prices.csv", header = True)
train_eva = spark.read.csv("dbfs:/FileStore/predictive/sales_train_evaluation.csv", header = True)


merge train_eva with calendar and sell price data

In [0]:
d_columns = [col for col in train_eva.columns if col.startswith("d_")]
n = len(d_columns)
stack_expr = ", ".join([f"'{c}', `{c}`" for c in d_columns])
id_cols = ["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]

train_eva_melted = train_eva.selectExpr(
    *id_cols,
    f"stack({n}, {stack_expr}) as (d, `sell units`)"
)


merge on calendar data, and sell price

In [0]:
df = train_eva_melted.join(calendar,on="d", how="left")

In [0]:
full_df = df.join(sell_prices, on = ["store_id","item_id","wm_yr_wk"], how= "left")
full_df.cache()

Out[4]: DataFrame[store_id: string, item_id: string, wm_yr_wk: string, d: string, id: string, dept_id: string, cat_id: string, state_id: string, sell units: string, date: string, weekday: string, wday: string, month: string, year: string, event_name_1: string, event_type_1: string, event_name_2: string, event_type_2: string, snap_CA: string, snap_TX: string, snap_WI: string, sell_price: string]

use only 2 years (730 days) data to train and predict

In [0]:
train_input =  full_df.filter((full_df["d"] == "d_423")) 
train_input

Out[21]: DataFrame[store_id: string, item_id: string, wm_yr_wk: string, d: string, id: string, dept_id: string, cat_id: string, state_id: string, sell units: string, date: string, weekday: string, wday: string, month: string, year: string, event_name_1: string, event_type_1: string, event_name_2: string, event_type_2: string, snap_CA: string, snap_TX: string, snap_WI: string, sell_price: string]

In [0]:
train_input.limit(1).display()

store_id,item_id,wm_yr_wk,d,id,dept_id,cat_id,state_id,sell units,date,weekday,wday,month,year,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI,sell_price
CA_1,FOODS_2_004,11209,d_423,FOODS_2_004_CA_1_evaluation,FOODS_2,FOODS,CA,0,2012-03-26,Monday,3,3,2012,,,,,0,0,0,


In [0]:
#d_423 = 2012-03-26, d_1153 = 2014-03-26, d_1154 = 2014-03-27, d_1182 = 2014-04-24 , d_1183 = 2014-04-25, d_1913 = 2016-04-24, d_1914 =2016-04-25  , d_1941 = 2016-05-22, d_1211 = 2014-05-23
train_input =  full_df.filter((full_df["date"] >= "2012-03-26") & (full_df["date"] <= "2014-03-26"))   #d_423 ~ d_1153  
train_target =  full_df.filter((full_df["date"] >= "2014-03-27") & (full_df["date"] <= "2014-04-24")) #d_1154 ~ d_1182
test_input = full_df.filter((full_df["date"] >= "2014-04-25") & (full_df["date"] <= "2016-04-24"))  #d_1183 ~ d_1913  
test_target =  full_df.filter((full_df["date"] >= "2016-04-25") & (full_df["date"] <= "2016-05-22"))   #d_1914 ~ d_1941
final_input = full_df.filter((full_df["date"] >= "2014-05-23") & (full_df["date"] <= "2016-05-22")) #d_1211~ d_1941

## Feature engineering

**3. Price Features**

In [0]:
# Price change from previous time
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag

# Define window
price_window = Window.partitionBy("store_id", "item_id").orderBy("date")

# Calculate percent change
train_input = train_input.withColumn(
    "price_change",
    (col("sell_price") - lag("sell_price").over(price_window)) / lag("sell_price").over(price_window)
)

test_input = test_input.withColumn(
    "price_change",
    (col("sell_price") - lag("sell_price").over(price_window)) / lag("sell_price").over(price_window)
)

final_input = final_input.withColumn(
    "price_change",
    (col("sell_price") - lag("sell_price").over(price_window)) / lag("sell_price").over(price_window)
)


In [0]:
# Normalized price (current / max historical)   #use train data's scaler to prevent data leakage!!
from pyspark.sql.window import Window
from pyspark.sql.functions import col, max as spark_max
# Define the window
price_window = Window.partitionBy("store_id", "item_id")
# Get price_max only from the training data
train_input = train_input.withColumn("price_max", spark_max("sell_price").over(price_window))
train_input = train_input.withColumn("price_norm", col("sell_price")/ col("price_max"))

# Extract the max price per (store_id, item_id) from training data
price_scaler = train_input.select("store_id", "item_id", "price_max").dropDuplicates()

# Join the scaler to test and final data
test_input = test_input.join(price_scaler, on=["store_id", "item_id"], how="left")
test_input = test_input.withColumn("price_norm", col("sell_price") / col("price_max"))

final_input = final_input.join(price_scaler, on=["store_id", "item_id"], how="left")
final_input = final_input.withColumn("price_norm", col("sell_price") / col("price_max"))

**4. Time/Calendar Features** \
Useful for capturing weekly or monthly trends, holidays, etc.

In [0]:
from pyspark.sql.functions import col, dayofweek, weekofyear, month, year, when

train_input = train_input.withColumn("date", col("date").cast("date"))
train_input = train_input.withColumn("dayofweek",dayofweek(col("date"))-1) # 0=monday
#week number of the year
train_input = train_input.withColumn("week", weekofyear(col("date")))
train_input = train_input.withColumn("month", month(col("date")))
train_input = train_input.withColumn("year", year(col("date")))
train_input = train_input.withColumn("is_weekend", when(col("dayofweek")>=5, 1).otherwise(0))

test_input = test_input.withColumn("date", col("date").cast("date"))
test_input = test_input.withColumn("dayofweek",dayofweek(col("date"))-1) # 0=monday
#week number of the year
test_input = test_input.withColumn("week", weekofyear(col("date")))
test_input = test_input.withColumn("month", month(col("date")))
test_input = test_input.withColumn("year", year(col("date")))
test_input = test_input.withColumn("is_weekend", when(col("dayofweek")>=5, 1).otherwise(0))

final_input = final_input.withColumn("date", col("date").cast("date"))
final_input = final_input.withColumn("dayofweek",dayofweek(col("date"))-1) # 0=monday
#week number of the year
final_input = final_input.withColumn("week", weekofyear(col("date")))
final_input = final_input.withColumn("month", month(col("date")))
final_input = final_input.withColumn("year", year(col("date")))
final_input = final_input.withColumn("is_weekend", when(col("dayofweek")>=5, 1).otherwise(0))



**5. Demand Pattern Features**

In [0]:
train_input.cache()

Out[28]: DataFrame[store_id: string, item_id: string, wm_yr_wk: string, d: string, id: string, dept_id: string, cat_id: string, state_id: string, sell units: string, date: date, weekday: string, wday: string, month: int, year: int, event_name_1: string, event_type_1: string, event_name_2: string, event_type_2: string, snap_CA: string, snap_TX: string, snap_WI: string, sell_price: string, price_change: double, price_max: string, price_norm: double, dayofweek: int, week: int, is_weekend: int]

In [0]:
test_input.cache()

Out[29]: DataFrame[store_id: string, item_id: string, wm_yr_wk: string, d: string, id: string, dept_id: string, cat_id: string, state_id: string, sell units: string, date: date, weekday: string, wday: string, month: int, year: int, event_name_1: string, event_type_1: string, event_name_2: string, event_type_2: string, snap_CA: string, snap_TX: string, snap_WI: string, sell_price: string, price_change: double, price_max: string, price_norm: double, dayofweek: int, week: int, is_weekend: int]

In [0]:
final_input.cache()

Out[30]: DataFrame[store_id: string, item_id: string, wm_yr_wk: string, d: string, id: string, dept_id: string, cat_id: string, state_id: string, sell units: string, date: date, weekday: string, wday: string, month: int, year: int, event_name_1: string, event_type_1: string, event_name_2: string, event_type_2: string, snap_CA: string, snap_TX: string, snap_WI: string, sell_price: string, price_change: double, price_max: string, price_norm: double, dayofweek: int, week: int, is_weekend: int]

In [0]:
from pyspark.sql.functions import min as spark_min

First_sale_day = full_df.groupBy("id").agg(spark_min("date").alias("first_sale_day"))

In [0]:
from pyspark.sql.functions import to_date, col, datediff
train_input = train_input.join(First_sale_day, how = "left", on = "id")
train_input = train_input.withColumn("first_sale_day", to_date("first_sale_day"))
train_input = train_input.withColumn("days_since_first_sale", datediff(col("date") , col("first_sale_day")))

test_input = test_input.join(First_sale_day, how = "left", on = "id")
test_input = test_input.withColumn("first_sale_day", to_date("first_sale_day"))
test_input = test_input.withColumn("days_since_first_sale", datediff(col("date") , col("first_sale_day")))

final_input = final_input.join(First_sale_day, how = "left", on = "id")
final_input = final_input.withColumn("first_sale_day", to_date("first_sale_day"))
final_input = final_input.withColumn("days_since_first_sale", datediff(col("date") , col("first_sale_day")))

**6. Categorical Encodings**

In [0]:
from pyspark.sql.functions import when, col

train_input = train_input.withColumn("snap", when(
    ((col("state_id") == "CA") & (col("snap_CA").cast("int") == 1)) |
    ((col("state_id") == "TX") & (col("snap_TX").cast("int") == 1)) |
    ((col("state_id") == "WI") & (col("snap_WI").cast("int") == 1)),
    1).otherwise(0))

test_input = test_input.withColumn("snap", when(
    ((col("state_id") == "CA") & (col("snap_CA").cast("int") == 1)) |
    ((col("state_id") == "TX") & (col("snap_TX").cast("int") == 1)) |
    ((col("state_id") == "WI") & (col("snap_WI").cast("int") == 1)),
    1).otherwise(0))

final_input = final_input.withColumn("snap", when(
    ((col("state_id") == "CA") & (col("snap_CA").cast("int") == 1)) |
    ((col("state_id") == "TX") & (col("snap_TX").cast("int") == 1)) |
    ((col("state_id") == "WI") & (col("snap_WI").cast("int") == 1)),
    1).otherwise(0))



select features we need

In [0]:
train_input = train_input.select("store_id","item_id","d",  "cat_id", "state_id", "month",  "dayofweek", "is_weekend", "event_type_1","event_type_2", "snap", "price_norm","price_change", "first_sale_day","days_since_first_sale", "sell units")
test_input = test_input.select("store_id","item_id", "d",  "cat_id", "state_id", "month",  "dayofweek",  "is_weekend", "event_type_1", "event_type_2", "snap", "price_norm","price_change", "first_sale_day","days_since_first_sale", "sell units")
final_input = final_input.select("store_id","item_id", "d", "cat_id", "state_id", "month",  "dayofweek",  "is_weekend",  "event_type_1", "event_type_2", "snap", "price_norm","price_change", "first_sale_day","days_since_first_sale", "sell units")

one hot encode the categorical features

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

categorical_cols = [ "cat_id", "state_id", "month", "dayofweek", "event_type_1", "event_type_2"]


# Step 1: Create indexers for each column
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep")
    for col in categorical_cols
]

# Step 2: Create one-hot encoders for each indexed column
encoders = [
    OneHotEncoder(inputCol=col + "_index", outputCol=col + "_ohe")
    for col in categorical_cols
]

# Step 3: Combine into a pipeline
pipeline = Pipeline(stages=indexers + encoders)

# Step 4: Fit and transform your dataset
model = pipeline.fit(train_input)
train_input_encoded = model.transform(train_input)
test_input_encoded = model.transform(test_input)
final_input_encoded = model.transform(final_input)


In [0]:
train_input.select("month").distinct().show()

+-----+
|month|
+-----+
|   12|
|    1|
|    6|
|    3|
|    5|
|    9|
|    4|
|    8|
|    7|
|   10|
|   11|
|    2|
+-----+



In [0]:
train_input_encoded.select("month_ohe").limit(1).display()

month_ohe
"Map(vectorType -> sparse, length -> 12, indices -> List(0), values -> List(1.0))"


In [0]:
train_input_encoded.printSchema()

root
 |-- store_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- d: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- is_weekend: integer (nullable = false)
 |-- event_type_1: string (nullable = true)
 |-- event_type_2: string (nullable = true)
 |-- snap: integer (nullable = false)
 |-- price_norm: double (nullable = true)
 |-- price_change: double (nullable = true)
 |-- first_sale_day: date (nullable = true)
 |-- days_since_first_sale: integer (nullable = true)
 |-- sell units: string (nullable = true)
 |-- cat_id_index: double (nullable = false)
 |-- state_id_index: double (nullable = false)
 |-- month_index: double (nullable = false)
 |-- dayofweek_index: double (nullable = false)
 |-- event_type_1_index: double (nullable = false)
 |-- event_type_2_index: double (nullable = false)
 |-- cat_id_ohe: vector (nullable = true)
 

In [0]:
from pyspark.sql.functions import col
from pyspark.ml.functions import vector_to_array

def expand_vector_column(df, vector_col, prefix):
    """Correct way to split a one-hot encoded vector column into individual columns using vector_to_array."""
    array_col = vector_col + "_arr"
    df = df.withColumn(array_col, vector_to_array(col(vector_col)))
    vector_size = len(df.select(array_col).first()[0])  # fixed this line
    for i in range(vector_size):
        df = df.withColumn(f"{prefix}_{i}", col(array_col)[i])
    return df.drop(array_col)

# Example application
columns_to_expand = ["cat_id_ohe", "state_id_ohe", "month_ohe", "dayofweek_ohe", "event_type_1_ohe", "event_type_2_ohe"]

train_input_expand = train_input_encoded
for col_name in columns_to_expand:
    train_input_expand = expand_vector_column(train_input_expand, col_name, col_name)

test_input_expand = test_input_encoded
for col_name in columns_to_expand:
    test_input_expand = expand_vector_column(test_input_expand, col_name, col_name)

final_input_expand = final_input_encoded
for col_name in columns_to_expand:
    final_input_expand = expand_vector_column(final_input_expand, col_name, col_name)


In [0]:
train_input_expand.printSchema()

root
 |-- store_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- d: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- is_weekend: integer (nullable = false)
 |-- event_type_1: string (nullable = true)
 |-- event_type_2: string (nullable = true)
 |-- snap: integer (nullable = false)
 |-- price_norm: double (nullable = true)
 |-- price_change: double (nullable = true)
 |-- first_sale_day: date (nullable = true)
 |-- days_since_first_sale: integer (nullable = true)
 |-- sell units: string (nullable = true)
 |-- cat_id_index: double (nullable = false)
 |-- state_id_index: double (nullable = false)
 |-- month_index: double (nullable = false)
 |-- dayofweek_index: double (nullable = false)
 |-- event_type_1_index: double (nullable = false)
 |-- event_type_2_index: double (nullable = false)
 |-- cat_id_ohe: vector (nullable = true)
 

select features we need

In [0]:
train_input_expand_1 = train_input_expand.drop(
    "cat_id_index", "state_id_index", "month_index", "dayofweek_index",
    "event_type_1_index", "event_type_2_index",
    "cat_id_ohe", "state_id_ohe", "month_ohe", "dayofweek_ohe",
    "event_type_1_ohe", "event_type_2_ohe",  "cat_id", "state_id", "month", "dayofweek", "event_type_1", "event_type_2"
)

test_input_expand_1 = test_input_expand.drop(
    "cat_id_index", "state_id_index", "month_index", "dayofweek_index",
    "event_type_1_index", "event_type_2_index",
    "cat_id_ohe", "state_id_ohe", "month_ohe", "dayofweek_ohe",
    "event_type_1_ohe", "event_type_2_ohe",  "cat_id", "state_id", "month", "dayofweek", "event_type_1", "event_type_2"
)

final_input_expand_1 = final_input_expand.drop(
    "cat_id_index", "state_id_index", "month_index", "dayofweek_index",
    "event_type_1_index", "event_type_2_index",
    "cat_id_ohe", "state_id_ohe", "month_ohe", "dayofweek_ohe",
    "event_type_1_ohe", "event_type_2_ohe",  "cat_id", "state_id", "month", "dayofweek", "event_type_1", "event_type_2"
)



normalized price_change, days_since_first_sale, sell units

In [0]:
from pyspark.sql import functions as F

# List of columns you want to standardize
inputCols = ["price_change", "days_since_first_sale", "sell units"]

# Initialize the DataFrames for transformation
train_input_norm = train_input_expand_1
test_input_norm = test_input_expand_1
final_input_norm = final_input_expand_1

# For each column, compute mean and stddev
for col in inputCols:
    # Compute mean and standard deviation for the column
    mean = train_input_norm.agg({col: "mean"}).collect()[0][0]
    stddev = train_input_norm.agg({col: "stddev"}).collect()[0][0]
    
    # Apply standardization to the train, test, and final data
    train_input_norm = train_input_norm.withColumn(col + "_scaled", (F.col(col) - mean) / stddev)
    test_input_norm = test_input_norm.withColumn(col + "_scaled", (F.col(col) - mean) / stddev)
    final_input_norm = final_input_norm.withColumn(col + "_scaled", (F.col(col) - mean) / stddev)


In [0]:
train_input_norm_1 = train_input_norm.drop(
    "price_change","sell units"
)
test_input_norm_1 = test_input_norm.drop(
    "price_change","sell units"
)
final_input_norm_1 = final_input_norm.drop(
    "price_change","sell units"
)



In [0]:
train_target = train_target.select("store_id", "item_id", "date","sell units")
test_target = test_target.select("store_id", "item_id", "date","sell units")

In [0]:
train_input_norm_1.write.parquet("/dbfs/FileStore/predictive/train_input.parquet")
test_input_norm_1.write.parquet("/dbfs/FileStore/predictive/test_input.parquet")
final_input_norm_1.write.parquet("/dbfs/FileStore/predictive/final_input.parquet")
train_target.write.parquet("/dbfs/FileStore/predictive/train_target.parquet")
test_target.write.parquet("/dbfs/FileStore/predictive/test_target.parquet")

turn features to -10 if the item didn't start to sell. use padding mask and set special value = -10 later in training process

In [0]:
from pyspark.sql import functions as F

# List of columns you want to check for null or pad with -10
columns_to_check = [
 "is_weekend", "snap", "price_norm",
    "cat_id_ohe_0", "cat_id_ohe_1", "cat_id_ohe_2",
    "state_id_ohe_0", "state_id_ohe_1", "state_id_ohe_2",
    "month_ohe_0", "month_ohe_1", "month_ohe_2", "month_ohe_3",
    "month_ohe_4", "month_ohe_5", "month_ohe_6", "month_ohe_7",
    "month_ohe_8", "month_ohe_9", "month_ohe_10", "month_ohe_11",
    "dayofweek_ohe_0", "dayofweek_ohe_1", "dayofweek_ohe_2",
    "dayofweek_ohe_3", "dayofweek_ohe_4", "dayofweek_ohe_5", "dayofweek_ohe_6",
    "event_type_1_ohe_0", "event_type_1_ohe_1", "event_type_1_ohe_2", "event_type_1_ohe_3",
    "event_type_2_ohe_0",
    "price_change_scaled", "days_since_first_sale_scaled", "sell units_scaled"
]
pad_train_input = train_input_norm_1
pad_test_input = test_input_norm_1
pad_final_input = final_input_norm_1

# Apply this condition to all columns
for col in columns_to_check:
    pad_train_input = pad_train_input.withColumn(col, F.when(F.col("days_since_first_sale").cast("int") == 0  , -10).otherwise(F.col(col)))
    pad_test_input = pad_test_input.withColumn(col, F.when(F.col("days_since_first_sale").cast("int")== 0, -10).otherwise(F.col(col)))
    pad_final_input = pad_final_input.withColumn(col, F.when(F.col("days_since_first_sale").cast("int") == 0, -10).otherwise(F.col(col)))


In [0]:
pad_train_input = pad_train_input.drop("first_sale_day","days_since_first_sale")
pad_test_input = pad_test_input.drop("first_sale_day","days_since_first_sale")
pad_final_input = pad_final_input.drop("first_sale_day","days_since_first_sale")

In [0]:
pad_train_input.write.parquet("/dbfs/FileStore/predictive/train_input_pad.parquet")
pad_test_input.write.parquet("/dbfs/FileStore/predictive/test_input_pad.parquet")
pad_final_input.write.parquet("/dbfs/FileStore/predictive/final_input_pad.parquet")

In [0]:
%fs ls dbfs:/dbfs/FileStore/predictive

path,name,size,modificationTime
dbfs:/dbfs/FileStore/predictive/final_input.parquet/,final_input.parquet/,0,0
dbfs:/dbfs/FileStore/predictive/final_input_pad.parquet/,final_input_pad.parquet/,0,0
dbfs:/dbfs/FileStore/predictive/test_input.parquet/,test_input.parquet/,0,0
dbfs:/dbfs/FileStore/predictive/test_input_pad.parquet/,test_input_pad.parquet/,0,0
dbfs:/dbfs/FileStore/predictive/test_target.parquet/,test_target.parquet/,0,0
dbfs:/dbfs/FileStore/predictive/train_input.parquet/,train_input.parquet/,0,0
dbfs:/dbfs/FileStore/predictive/train_input_pad.parquet/,train_input_pad.parquet/,0,0
dbfs:/dbfs/FileStore/predictive/train_target.parquet/,train_target.parquet/,0,0


In [0]:
# dbutils.fs.cp("dbfs:/dbfs/FileStore/predictive/final_input_pad.parquet", "dbfs:/FileStore/predictive_clean/final_input_pad.parquet", recurse=True)
# dbutils.fs.cp("dbfs:/dbfs/FileStore/predictive/test_input_pad.parquet", "dbfs:/FileStore/predictive_clean/test_input_pad.parquet", recurse=True)
# dbutils.fs.cp("dbfs:/dbfs/FileStore/predictive/train_input_pad.parquet", "dbfs:/FileStore/predictive_clean/train_input_pad.parquet", recurse=True)
# dbutils.fs.cp("dbfs:/dbfs/FileStore/predictive/train_target.parquet", "dbfs:/FileStore/predictive_clean/train_target.parquet", recurse=True)
# dbutils.fs.cp("dbfs:/dbfs/FileStore/predictive/test_target.parquet", "dbfs:/FileStore/predictive_clean/test_target.parquet", recurse=True)

dbutils.fs.cp("dbfs:/FileStore/predictive_clean/test_input_pad.parquet", 
              "file:/dbfs/FileStore/predictive_clean/test_input_pad.parquet", 
              recurse=True)
dbutils.fs.cp("dbfs:/FileStore/predictive_clean/train_input_pad.parquet", 
              "file:/dbfs/FileStore/predictive_clean/train_input_pad.parquet", 
              recurse=True)
dbutils.fs.cp("dbfs:/FileStore/predictive_clean/final_input_pad.parquet", 
              "file:/dbfs/FileStore/predictive_clean/final_input_pad.parquet", 
              recurse=True)
dbutils.fs.cp("dbfs:/FileStore/predictive_clean/test_target.parquet", 
              "file:/dbfs/FileStore/predictive_clean/test_target.parquet", 
              recurse=True)
dbutils.fs.cp("dbfs:/FileStore/predictive_clean/train_target.parquet", 
              "file:/dbfs/FileStore/predictive_clean/train_target.parquet", 
              recurse=True)



Out[39]: True

In [0]:
%fs ls /FileStore/predictive_clean/

path,name,size,modificationTime
dbfs:/FileStore/predictive_clean/final_input_pad.parquet/,final_input_pad.parquet/,0,0
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/,test_input_pad.parquet/,0,0
dbfs:/FileStore/predictive_clean/test_target.parquet/,test_target.parquet/,0,0
dbfs:/FileStore/predictive_clean/train_input_pad.parquet/,train_input_pad.parquet/,0,0
dbfs:/FileStore/predictive_clean/train_target.parquet/,train_target.parquet/,0,0


In [0]:

%fs ls /FileStore/predictive_clean/test_input_pad.parquet/

path,name,size,modificationTime
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/_SUCCESS,_SUCCESS,0,1746121954000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/_committed_6341098318129863703,_committed_6341098318129863703,1248,1746121954000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/_started_6341098318129863703,_started_6341098318129863703,0,1746121955000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/part-00000-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10707-1-c000.snappy.parquet,part-00000-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10707-1-c000.snappy.parquet,2823196,1746121955000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/part-00001-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10708-1-c000.snappy.parquet,part-00001-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10708-1-c000.snappy.parquet,2841911,1746121956000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/part-00002-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10709-1-c000.snappy.parquet,part-00002-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10709-1-c000.snappy.parquet,2930372,1746121957000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/part-00003-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10710-1-c000.snappy.parquet,part-00003-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10710-1-c000.snappy.parquet,2836645,1746121958000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/part-00004-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10711-1-c000.snappy.parquet,part-00004-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10711-1-c000.snappy.parquet,2878417,1746121959000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/part-00005-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10712-1-c000.snappy.parquet,part-00005-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10712-1-c000.snappy.parquet,2726292,1746121959000
dbfs:/FileStore/predictive_clean/test_input_pad.parquet/part-00006-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10713-1-c000.snappy.parquet,part-00006-tid-6341098318129863703-f4da18e6-eea9-4d99-8265-c704d5bd6593-10713-1-c000.snappy.parquet,2772999,1746121960000


In [0]:
%sh
cd /dbfs/FileStore/predictive_clean
zip -r data.zip train_input_pad.parquet test_input_pad.parquet final_input_pad.parquet train_target.parquet test_target.parquet



updating: train_input_pad.parquet/ (stored 0%)
updating: train_input_pad.parquet/._started_1124793087610049014.crc (stored 0%)
updating: train_input_pad.parquet/part-00003-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10297-1-c000.snappy.parquet (deflated 68%)
updating: train_input_pad.parquet/.part-00010-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10304-1-c000.snappy.parquet.crc (deflated 0%)
updating: train_input_pad.parquet/part-00006-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10300-1-c000.snappy.parquet (deflated 67%)
updating: train_input_pad.parquet/.part-00011-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10305-1-c000.snappy.parquet.crc (deflated 0%)
updating: train_input_pad.parquet/._SUCCESS.crc (stored 0%)
updating: train_input_pad.parquet/.part-00007-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10301-1-c000.snappy.parquet.crc (deflated 0%)
updating: train_input_pad.parquet/part-00005-tid-1124793

In [0]:
%sh
unzip -l /dbfs/FileStore/predictive_clean/data.zip

Archive:  /dbfs/FileStore/predictive_clean/data.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
        0  2025-05-01 18:20   train_input_pad.parquet/
        8  2025-05-01 18:20   train_input_pad.parquet/._started_1124793087610049014.crc
  2464532  2025-05-01 18:20   train_input_pad.parquet/part-00003-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10297-1-c000.snappy.parquet
    20292  2025-05-01 18:20   train_input_pad.parquet/.part-00010-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10304-1-c000.snappy.parquet.crc
  2594906  2025-05-01 18:20   train_input_pad.parquet/part-00006-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10300-1-c000.snappy.parquet
    19768  2025-05-01 18:20   train_input_pad.parquet/.part-00011-tid-1124793087610049014-a4f75e2b-f77d-460c-be87-4b0a26fcc4ad-10305-1-c000.snappy.parquet.crc
        8  2025-05-01 18:20   train_input_pad.parquet/._SUCCESS.crc
    20204  2025-05-01 18:20   train_input

In [0]:
%sh
cd /dbfs/FileStore/predictive_clean
ls



data.zip
final_input_pad.parquet
test_input_pad.parquet
test_input_pad.zip
test_target.parquet
train_input_pad.parquet
train_target.parquet


In [0]:
%fs cp file:/dbfs/FileStore/predictive_clean/data.zip dbfs:/FileStore/data.zip


In [0]:
#https://community.cloud.databricks.com/files/data.zip