In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import Window
import glob
import os
import shutil

spark = (
    SparkSession.builder
    .appName("Ticker Transformation")
    .master("local[2]")
    .getOrCreate()
    )

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/19 08:31:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/19 08:31:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:

raw = spark.read.csv('./data/index.csv', header=True, inferSchema=True)

In [3]:
raw.select('ticker', 'indexname').distinct().orderBy('ticker').show(20,False)

+------+---------------------------------+
|ticker|indexname                        |
+------+---------------------------------+
|ADBE  |Global Transaction Index         |
|ADBE  |Global Transaction Index (Legacy)|
|DASH  |Global New Store Index           |
|DASH  |Global Order Index               |
|FVRR  |Global Transaction Index         |
|MELI  |Mercado Pago Transaction Index   |
|MELI  |Mercado Libre Transaction Index  |
+------+---------------------------------+



In [4]:
# Use the lowest granularity of Duration for each Ticker
duration_map = [
    ("Week", 0),
    ("Month", 1),
    ("Quarter", 2),
    ("Year", 3),
    ("Mid-month", 4),
    ("Custom Quarter", 5),
]

duration_lookup = spark.createDataFrame(duration_map, ["DURATION", "duration_rank"])

lowest_duration_idx = (
    raw.select("TICKER", "INDEXNAME", "DURATION")
       .dropDuplicates()
       .join(duration_lookup, "DURATION", "left")
       .withColumn(
           "rn",
           f.row_number().over(
               Window.partitionBy("TICKER", "INDEXNAME")
                     .orderBy("duration_rank", "DURATION")
           )
       )
       .filter(f.col("rn") == 1)
       .select("TICKER", "INDEXNAME", "DURATION")
)

In [5]:
from pyspark.sql import functions as f, Window

# lowest cadence per (ticker, index)
filtered = (
    raw.join(lowest_duration_idx, ["TICKER", "INDEXNAME", "DURATION"], "inner")
       .withColumn("period_end", f.to_date("PERIODEND"))
)

w = Window.partitionBy("TICKER", "INDEXNAME").orderBy("period_end")

daily_rows = (
    filtered
    .withColumn("prev_end", f.lag("period_end").over(w))
    .withColumn("prev_cum", f.lag("CUMULATIVEVALUE").over(w))
    .filter(f.col("prev_end").isNotNull())                  # skip the first monthly observation
    .withColumn("span_start", f.col("prev_end"))
    .withColumn("span_end", f.date_sub("period_end", 1))
    .withColumn("days_in_period",
                f.datediff("span_end", "span_start") + f.lit(1))
    .withColumn("daily_value", f.col("VALUE") / f.col("days_in_period"))
    .withColumn("date_seq", f.sequence("span_start", "span_end"))
    .withColumn("DAY_DATE", f.explode("date_seq"))
    .withColumn("day_offset", f.datediff("DAY_DATE", "span_start"))
    .withColumn(
        "VALUE", f.col("daily_value")
    )
    .withColumn(
        "CUMULATIVEVALUE",
        f.col("prev_cum") + (f.col("day_offset") + f.lit(1)) * f.col("daily_value")
    )
    .select(
        "TICKER",
        "INDEXNAME",
        "DAY_DATE",
        f.lit("Day").alias("DURATION"),
        "VALUE",
        "CUMULATIVEVALUE",
    )
)

# combine with originals (no extra 2016‑03‑31 row)
result_with_daily = (
    raw.unionByName(
        daily_rows.select(
            "TICKER",
            "INDEXNAME",
            f.col("DAY_DATE").cast("string").alias("PERIODEND"),
            "DURATION",
            "VALUE",
            "CUMULATIVEVALUE",
        ),
        allowMissingColumns=True,
    )
    .filter("duration = 'Day'")
    .orderBy(
        "TICKER",
        "INDEXNAME",
        "PERIODEND",
    )
)
#.orderBy('periodend').show(33,False)


output_dir = "output/result_with_daily_tmp"
final_path = "output/result_with_daily.csv"

(
    result_with_daily
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .csv(output_dir)
)

part_file = glob.glob(os.path.join(output_dir, "part-*.csv"))[0]
shutil.move(part_file, final_path)
shutil.rmtree(output_dir)

                                                                                

In [6]:
import plotly.express as px
import pandas as pd
from pyspark.sql import functions as f

pdf = (
    result_with_daily
    .withColumn("ticker_index", f.concat_ws("-", f.col("TICKER"), f.col("INDEXNAME")))
).toPandas()


pdf["PERIODEND"] = pd.to_datetime(pdf["PERIODEND"])

fig = px.line(
    pdf,
    x="PERIODEND",
    y="CUMULATIVEVALUE",
    color="ticker_index",                         # separate line per index
    title="Daily CUMULATIVEVALUE by Index for MELI",
    markers=True,
    labels={"PERIODEND": "Date", "CUMULATIVEVALUE": "CUMULATIVEVALUE", "ticker_index": "Index"},
)

fig.show()
