In [1]:

!pip -q install pyspark==3.5.1 mlflow==2.14.1

from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("ISM6362-Final")
         .getOrCreate())
spark


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m25.8/25.8 MB[0m [31m48.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m247.0/247.0 kB[0m [31m22.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m147.8/147.8 kB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.9/114.9 kB[0m [31m12.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.4/84.4 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m67.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.6/65.6 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
from pyspark.sql import functions as F, types as T
from datetime import datetime, timedelta
import os, json, random

base_dir = "/content/beatblast"
raw_dir = f"{base_dir}/raw"
bronze_dir = f"{base_dir}/bronze"
silver_dir = f"{base_dir}/silver"
gold_dir   = f"{base_dir}/gold"
chk_dir    = f"{base_dir}/_chk"

for d in [raw_dir, bronze_dir, silver_dir, gold_dir, chk_dir]:
    os.makedirs(d, exist_ok=True)

print("Base:", base_dir)


Base: /content/beatblast


In [3]:
users = [f"u{i:03d}" for i in range(1, 301)]
regions = ["US", "EU", "IN", "APAC"]
devices = ["iOS", "Android", "Web"]
event_types = ["play", "skip", "like", "error"]

now = datetime.utcnow()
days_back = 15

def make_event(ts, user):
    e = random.choices(event_types, weights=[75, 18, 5, 2])[0]
    return {
        "event_ts": ts.strftime("%Y-%m-%d %H:%M:%S"),
        "user_id": user,
        "event_type": e,
        "track_id": f"t{random.randint(1,5000)}",
        "artist_id": f"a{random.randint(1,800)}",
        "region": random.choice(regions),
        "device": random.choice(devices),
        "play_ms": random.randint(1000, 240000) if e in ["play","like"] else None
    }

for day in range(days_back):
    ts_day = now - timedelta(days=day)
    events = []
    for _ in range(random.randint(1500, 2500)):
        user = random.choice(users)
        minute_offset = random.randint(0, 24*60-1)
        ts = ts_day.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(minutes=minute_offset)
        events.append(make_event(ts, user))
    out_path = os.path.join(raw_dir, f"events_{ts_day.strftime('%Y%m%d')}.json")
    with open(out_path, "w") as f:
        for e in events:
            f.write(json.dumps(e) + "\n")

len(os.listdir(raw_dir))


15

In [4]:
event_schema = T.StructType([
    T.StructField("event_ts",  T.StringType(), True),
    T.StructField("user_id",   T.StringType(), True),
    T.StructField("event_type",T.StringType(), True),
    T.StructField("track_id",  T.StringType(), True),
    T.StructField("artist_id", T.StringType(), True),
    T.StructField("region",    T.StringType(), True),
    T.StructField("device",    T.StringType(), True),
    T.StructField("play_ms",   T.LongType(), True),
])

bronze_df = (spark.read
    .schema(event_schema)
    .json(raw_dir))

bronze_df = bronze_df.withColumn("event_ts", F.to_timestamp("event_ts"))
bronze_df.write.mode("overwrite").parquet(bronze_dir)

print("Bronze rows:", spark.read.parquet(bronze_dir).count())


Bronze rows: 28929


In [5]:
bronze = spark.read.parquet(bronze_dir)

valid_types = ["play","skip","like","error"]
silver = (bronze
    .filter(F.col("event_ts").isNotNull() & F.col("user_id").isNotNull())
    .filter(F.col("event_type").isin(valid_types))
    .withColumn("date", F.to_date("event_ts"))
)

silver.write.mode("overwrite").parquet(silver_dir)
print("Silver rows:", spark.read.parquet(silver_dir).count())

Silver rows: 28929


In [6]:
silver = spark.read.parquet(silver_dir)


dau = (silver
    .select("date","user_id")
    .dropna()
    .dropDuplicates(["date","user_id"])
    .groupBy("date").agg(F.countDistinct("user_id").alias("dau")))
dau_path = f"{gold_dir}/dau_daily"
dau.write.mode("overwrite").parquet(dau_path)

active_daily = (silver
    .select("user_id","date")
    .dropna()
    .dropDuplicates(["user_id","date"]))

base = active_daily.withColumnRenamed("date","d0")
future = active_daily.withColumnRenamed("date","future_date")

retention = (base.alias("b")
    .join(future.alias("f"),
          (F.col("b.user_id")==F.col("f.user_id")) &
          (F.col("f.future_date") > F.col("b.d0")) &
          (F.col("f.future_date") <= F.col("b.d0") + F.expr("INTERVAL 7 DAYS")),
          "left")
    .groupBy("b.d0")
    .agg(
        F.countDistinct("b.user_id").alias("d0_users"),
        F.countDistinct(F.when(F.col("f.future_date").isNotNull(), F.col("b.user_id"))).alias("returned_1_7")
    )
    .withColumn("retention_7d", F.round(F.col("returned_1_7")/F.col("d0_users"), 4))
    .withColumnRenamed("b.d0","date"))
ret_path = f"{gold_dir}/retention_7d"
retention.write.mode("overwrite").parquet(ret_path)


plays = F.sum(F.when(F.col("event_type")=="play", 1).otherwise(0)).alias("plays")
skips = F.sum(F.when(F.col("event_type")=="skip", 1).otherwise(0)).alias("skips")

skip_rate = (silver
    .groupBy("date")
    .agg(plays, skips)
    .withColumn("skip_rate",
                F.round(F.col("skips")/F.when(F.col("plays")>0, F.col("plays")).otherwise(None), 4)))
skip_path = f"{gold_dir}/skip_rate_daily"
skip_rate.write.mode("overwrite").parquet(skip_path)

print("Gold paths:", dau_path, ret_path, skip_path)

Gold paths: /content/beatblast/gold/dau_daily /content/beatblast/gold/retention_7d /content/beatblast/gold/skip_rate_daily


In [7]:
dau.orderBy("date").show(10, False)
retention.orderBy("date").show(10, False)
skip_rate.orderBy("date").show(10, False)

+----------+---+
|date      |dau|
+----------+---+
|2025-08-04|299|
|2025-08-05|300|
|2025-08-06|300|
|2025-08-07|300|
|2025-08-08|300|
|2025-08-09|298|
|2025-08-10|300|
|2025-08-11|299|
|2025-08-12|298|
|2025-08-13|299|
+----------+---+
only showing top 10 rows



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `date` cannot be resolved. Did you mean one of the following? [`b`.`d0`, `d0_users`, `retention_7d`, `returned_1_7`].;
'Sort ['date ASC NULLS FIRST], true
+- Project [d0#181, d0_users#209L, returned_1_7#210L, round((cast(returned_1_7#210L as double) / cast(d0_users#209L as double)), 4) AS retention_7d#216]
   +- Aggregate [d0#181], [d0#181, count(distinct user_id#140) AS d0_users#209L, count(distinct CASE WHEN isnotnull(future_date#184) THEN user_id#140 END) AS returned_1_7#210L]
      +- Join LeftOuter, (((user_id#140 = user_id#188) AND (future_date#184 > d0#181)) AND (future_date#184 <= date_add(d0#181, extractansiintervaldays(INTERVAL '7' DAY))))
         :- SubqueryAlias b
         :  +- Project [user_id#140, date#147 AS d0#181]
         :     +- Deduplicate [user_id#140, date#147]
         :        +- Filter atleastnnonnulls(2, user_id#140, date#147)
         :           +- Project [user_id#140, date#147]
         :              +- Relation [event_ts#139,user_id#140,event_type#141,track_id#142,artist_id#143,region#144,device#145,play_ms#146L,date#147] parquet
         +- SubqueryAlias f
            +- Project [user_id#188, date#195 AS future_date#184]
               +- Deduplicate [user_id#188, date#195]
                  +- Filter atleastnnonnulls(2, user_id#188, date#195)
                     +- Project [user_id#188, date#195]
                        +- Relation [event_ts#187,user_id#188,event_type#189,track_id#190,artist_id#191,region#192,device#193,play_ms#194L,date#195] parquet


In [8]:
stream_silver_dir = f"{base_dir}/stream_silver"
os.makedirs(stream_silver_dir, exist_ok=True)

silver_static = spark.read.parquet(silver_dir)
dates = [r['date'] for r in silver_static.select("date").distinct().orderBy("date").limit(2).collect()]
subset = silver_static.filter(F.col("date").isin(dates))
subset.write.mode("overwrite").parquet(stream_silver_dir)

alerts_out = f"{gold_dir}/alerts_error_rate"
alerts_chk = f"{chk_dir}/alerts"

silver_stream = (spark.readStream
    .schema(subset.schema)
    .parquet(stream_silver_dir))

alerts = (silver_stream
    .withWatermark("event_ts", "1 hour")
    .groupBy(
        F.window("event_ts", "10 minutes", "5 minutes").alias("w"),
        F.col("region")
    )
    .agg(
        F.sum(F.when(F.col("event_type")=="error", 1).otherwise(0)).alias("errors"),
        F.sum(F.when(F.col("event_type")=="play", 1).otherwise(0)).alias("plays")
    )
    .withColumn("error_rate",
                F.col("errors") / F.when(F.col("plays")+F.col("errors") > 0,
                                         F.col("plays")+F.col("errors")).otherwise(None))
    .filter(F.col("error_rate") > F.lit(0.05))
    .select(
        F.col("region"),
        F.col("w.start").alias("window_start"),
        F.col("w.end").alias("window_end"),
        F.round(F.col("error_rate"), 4).alias("error_rate")
    )
)

q = (alerts.writeStream
     .format("parquet")
     .option("checkpointLocation", alerts_chk)
     .option("path", alerts_out)
     .outputMode("append")
     .trigger(processingTime="5 seconds")
     .start())

q.awaitTermination(10)
q.stop()

spark.read.parquet(alerts_out).orderBy(F.col("window_start").desc()).show(10, False)

AnalysisException: Unable to infer schema for Parquet at . It must be specified manually.

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

events = spark.read.parquet(silver_dir).select("user_id","event_ts","event_type","play_ms")
events = events.withColumn("date", F.to_date("event_ts"))

user_day = (events
    .select("user_id","date")
    .dropna()
    .dropDuplicates(["user_id","date"])
)

future_activity = user_day.withColumnRenamed("date","future_date")
label_df = (user_day.alias("u")
    .join(future_activity.alias("f"),
          (F.col("u.user_id")==F.col("f.user_id")) &
          (F.col("f.future_date")>F.col("u.date")) &
          (F.col("f.future_date")<=F.col("u.date")+F.expr("INTERVAL 30 DAYS")),
          "left")
    .groupBy("u.user_id","u.date")
    .agg(F.max(F.when(F.col("f.future_date").isNotNull(), F.lit(1)).otherwise(F.lit(0))).alias("returned_30d"))
    .withColumn("label", (1 - F.col("returned_30d")).cast("int"))
)

feat_base = (events
    .withColumn("day", F.to_date("event_ts"))
    .select("user_id","day","event_type","play_ms")
)

window_feats = (feat_base.alias("e")
    .join(label_df.alias("l"),
          (F.col("e.user_id")==F.col("l.user_id")) &
          (F.col("e.day")<=F.col("l.date")) &
          (F.col("e.day")>=F.col("l.date")-F.expr("INTERVAL 14 DAYS")),
          "inner")
    .groupBy("l.user_id","l.date")
    .agg(
        F.sum(F.when(F.col("e.event_type")=="play",1).otherwise(0)).alias("plays_14d"),
        F.sum(F.when(F.col("e.event_type")=="skip",1).otherwise(0)).alias("skips_14d"),
        F.countDistinct("e.day").alias("active_days_14d"),
        F.sum(F.coalesce(F.col("e.play_ms"),F.lit(0))).alias("play_ms_14d")
    )
)

dataset = (label_df
    .join(window_feats, on=["user_id","date"], how="left")
    .fillna({"plays_14d":0,"skips_14d":0,"active_days_14d":0,"play_ms_14d":0})
)

quant = dataset.approxQuantile("date", [0.8], 0.01)[0]
train = dataset.filter(F.col("date") <= F.lit(quant))
test  = dataset.filter(F.col("date") >  F.lit(quant))

assembler = VectorAssembler(
    inputCols=["plays_14d","skips_14d","active_days_14d","play_ms_14d"],
    outputCol="features"
)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50)
pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train)
preds = model.transform(test)

auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC").evaluate(preds)
print("Test AUC:", round(auc, 4))

IllegalArgumentException: requirement failed: Quantile calculation for column date with data type DateType is not supported.

In [10]:
from pyspark.sql import functions as F


dataset = dataset.withColumn("date_ts", F.col("date").cast("timestamp").cast("long"))


quant = dataset.approxQuantile("date_ts", [0.8], 0.01)[0]

train = dataset.filter(F.col("date_ts") <= F.lit(quant))
test  = dataset.filter(F.col("date_ts") >  F.lit(quant))


In [13]:

from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

events = spark.read.parquet(silver_dir).select("user_id","event_ts","event_type","play_ms")
events = events.withColumn("date", F.to_date("event_ts"))

user_day = (events
    .select("user_id","date")
    .dropna()
    .dropDuplicates(["user_id","date"])
)

future_activity = user_day.withColumnRenamed("date","future_date")
label_df = (user_day.alias("u")
    .join(future_activity.alias("f"),
          (F.col("u.user_id")==F.col("f.user_id")) &
          (F.col("f.future_date")>F.col("u.date")) &
          (F.col("f.future_date")<=F.col("u.date")+F.expr("INTERVAL 30 DAYS")),
          "left")
    .groupBy("u.user_id","u.date")
    .agg(F.max(F.when(F.col("f.future_date").isNotNull(), F.lit(1)).otherwise(F.lit(0))).alias("returned_30d"))
    .withColumn("label", (1 - F.col("returned_30d")).cast("int"))
)


feat_base = (events
    .withColumn("day", F.to_date("event_ts"))
    .select("user_id","day","event_type","play_ms")
)

window_feats = (feat_base.alias("e")
    .join(label_df.alias("l"),
          (F.col("e.user_id")==F.col("l.user_id")) &
          (F.col("e.day")<=F.col("l.date")) &
          (F.col("e.day")>=F.col("l.date")-F.expr("INTERVAL 14 DAYS")),
          "inner")
    .groupBy("l.user_id","l.date")
    .agg(
        F.sum(F.when(F.col("e.event_type")=="play",1).otherwise(0)).alias("plays_14d"),
        F.sum(F.when(F.col("e.event_type")=="skip",1).otherwise(0)).alias("skips_14d"),
        F.countDistinct("e.day").alias("active_days_14d"),
        F.sum(F.coalesce(F.col("e.play_ms"),F.lit(0))).alias("play_ms_14d")
    )
)

dataset = (label_df
    .join(window_feats, on=["user_id","date"], how="left")
    .fillna({"plays_14d":0,"skips_14d":0,"active_days_14d":0,"play_ms_14d":0})
)


dataset = dataset.withColumn("date_ts", F.col("date").cast("timestamp").cast("long"))
quant = dataset.approxQuantile("date_ts", [0.8], 0.01)[0]
train = dataset.filter(F.col("date_ts") <= F.lit(quant))
test  = dataset.filter(F.col("date_ts") >  F.lit(quant))


assembler = VectorAssembler(
    inputCols=["plays_14d","skips_14d","active_days_14d","play_ms_14d"],
    outputCol="features"
)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50)
pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train)
preds = model.transform(test)


auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC").evaluate(preds)
print("Test AUC:", round(auc, 4))


IllegalArgumentException: requirement failed: rawPredictionCol vectors must have length=2, but got 1

In [14]:
from pyspark.sql import functions as F


def label_counts(df, name):
    cnts = df.groupBy("label").count().orderBy("label")
    print(f"{name} label distribution:")
    cnts.show()

dataset = dataset.withColumn("date_ts", F.col("date").cast("timestamp").cast("long"))


quant = dataset.approxQuantile("date_ts", [0.8], 0.01)[0]
train = dataset.filter(F.col("date_ts") <= F.lit(quant))
test  = dataset.filter(F.col("date_ts") >  F.lit(quant))

print("Initial (time-based) split:")
label_counts(train, "train")
label_counts(test, "test")


def has_two_classes(df):
    return df.select("label").distinct().count() >= 2

if not (has_two_classes(train) and has_two_classes(test)):
    print("⚠️ Time-based split produced a single class. Falling back to randomSplit 80/20.")
    train, test = dataset.randomSplit([0.8, 0.2], seed=42)
    label_counts(train, "train (random)")
    label_counts(test, "test (random)")


    if not (has_two_classes(train) and has_two_classes(test)):
        print("⚠️ Still single class. Relaxing churn horizon from 30d to 14d to increase positives.")

        future_activity_14 = (train.select("user_id","date")
                              .dropDuplicates(["user_id","date"])
                              .withColumnRenamed("date","future_date"))
        label_df_14 = (train.select("user_id","date").dropDuplicates()
            .alias("u").join(
                future_activity_14.alias("f"),
                (F.col("u.user_id")==F.col("f.user_id")) &
                (F.col("f.future_date")>F.col("u.date")) &
                (F.col("f.future_date")<=F.col("u.date")+F.expr("INTERVAL 14 DAYS")),
                "left")
            .groupBy("u.user_id","u.date")
            .agg(F.max(F.when(F.col("f.future_date").isNotNull(), F.lit(1)).otherwise(F.lit(0))).alias("returned_14d"))
            .withColumn("label", (1 - F.col("returned_14d")).cast("int"))
        )

        train = (label_df_14
                 .join(train.drop("label"), on=["user_id","date"], how="left")
                 .fillna({"plays_14d":0,"skips_14d":0,"active_days_14d":0,"play_ms_14d":0})
                 .withColumn("date_ts", F.col("date").cast("timestamp").cast("long")))

        test = dataset.sample(withReplacement=False, fraction=0.2, seed=99)
        test = test.fillna({"plays_14d":0,"skips_14d":0,"active_days_14d":0,"play_ms_14d":0})
        label_counts(train, "train (relaxed)")
        label_counts(test, "test (relaxed)")


Initial (time-based) split:
train label distribution:
+-----+-----+
|label|count|
+-----+-----+
|    0| 3591|
+-----+-----+

test label distribution:
+-----+-----+
|label|count|
+-----+-----+
|    0|  597|
|    1|  300|
+-----+-----+

⚠️ Time-based split produced a single class. Falling back to randomSplit 80/20.
train (random) label distribution:
+-----+-----+
|label|count|
+-----+-----+
|    0| 3400|
|    1|  233|
+-----+-----+

test (random) label distribution:
+-----+-----+
|label|count|
+-----+-----+
|    0|  788|
|    1|   67|
+-----+-----+



In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

assembler = VectorAssembler(
    inputCols=["plays_14d","skips_14d","active_days_14d","play_ms_14d"],
    outputCol="features"
)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50)
pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train)
preds = model.transform(test)

auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC").evaluate(preds)
print("Test AUC:", round(auc, 4))


Test AUC: 0.9984


In [16]:
spark.read.parquet(f"{gold_dir}/dau_daily").orderBy("date").show(10, False)
spark.read.parquet(f"{gold_dir}/retention_7d").orderBy("date").show(10, False)
spark.read.parquet(f"{gold_dir}/skip_rate_daily").orderBy("date").show(10, False)


+----------+---+
|date      |dau|
+----------+---+
|2025-08-04|299|
|2025-08-05|300|
|2025-08-06|300|
|2025-08-07|300|
|2025-08-08|300|
|2025-08-09|298|
|2025-08-10|300|
|2025-08-11|299|
|2025-08-12|298|
|2025-08-13|299|
+----------+---+
only showing top 10 rows



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `date` cannot be resolved. Did you mean one of the following? [`d0`, `d0_users`, `retention_7d`, `returned_1_7`].;
'Sort ['date ASC NULLS FIRST], true
+- Relation [d0#3928,d0_users#3929L,returned_1_7#3930L,retention_7d#3931] parquet


In [17]:
spark.read.parquet(f"{gold_dir}/alerts_error_rate").orderBy(F.col("window_start").desc()).show(10, False)


AnalysisException: Unable to infer schema for Parquet at . It must be specified manually.

In [18]:
spark.read.parquet(f"{gold_dir}/dau_daily").orderBy("date").show(10, False)


+----------+---+
|date      |dau|
+----------+---+
|2025-08-04|299|
|2025-08-05|300|
|2025-08-06|300|
|2025-08-07|300|
|2025-08-08|300|
|2025-08-09|298|
|2025-08-10|300|
|2025-08-11|299|
|2025-08-12|298|
|2025-08-13|299|
+----------+---+
only showing top 10 rows



In [22]:
spark.read.parquet(f"{gold_dir}/dau_daily").toPandas().to_csv("dau_daily.csv", index=False)
spark.read.parquet(f"{gold_dir}/retention_7d").toPandas().to_csv("retention_7d.csv", index=False)
spark.read.parquet(f"{gold_dir}/skip_rate_daily").toPandas().to_csv("skip_rate.csv", index=False)


ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [23]:
import glob, shutil


spark.read.parquet(f"{gold_dir}/dau_daily")\
    .coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/dau_daily_csv")

spark.read.parquet(f"{gold_dir}/retention_7d")\
    .coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/retention_7d_csv")

spark.read.parquet(f"{gold_dir}/skip_rate_daily")\
    .coalesce(1).write.mode("overwrite").option("header", "true")\
    .csv("/content/skip_rate_csv")


part = glob.glob("/content/dau_daily_csv/part-*.csv")[0]
shutil.copy(part, "/content/dau_daily.csv")

part = glob.glob("/content/retention_7d_csv/part-*.csv")[0]
shutil.copy(part, "/content/retention_7d.csv")

part = glob.glob("/content/skip_rate_csv/part-*.csv")[0]
shutil.copy(part, "/content/skip_rate.csv")

print("Wrote CSVs: /content/dau_daily.csv, /content/retention_7d.csv, /content/skip_rate.csv")


Wrote CSVs: /content/dau_daily.csv, /content/retention_7d.csv, /content/skip_rate.csv


In [24]:
from google.colab import files
files.download("/content/dau_daily.csv")
files.download("/content/retention_7d.csv")
files.download("/content/skip_rate.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>