In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DecimalType
from pyspark.sql.window import Window


def _read_csv(spark: SparkSession,
              filepath: str) -> DataFrame:

    schema = StructType([
        StructField("year", StringType()),
        StructField("acousticness", StringType()),
        StructField("danceability", DecimalType(17, 16)),
        StructField("duration_ms", StringType()),
        StructField("energy", DecimalType(17, 16)),
        StructField("instrumentalness", StringType()),
        StructField("liveness", StringType()),
        StructField("loudness", StringType()),
        StructField("speechiness", StringType()),
        StructField("tempo", StringType()),
        StructField("valence", StringType()),
        StructField("popularity", StringType()),
        StructField("key", StringType()),
        StructField("mode", StringType())
    ])

    df = spark.read.csv(
        path=filepath,
        sep=",",
        header=True,
        quote='"',
        encoding="UTF-8",
        schema=schema
    )

    return df


def _add_prev_years(df: DataFrame) -> DataFrame:
    window_fun = Window.partitionBy().orderBy("year")

    return df \
        .withColumn("prev_year_danceability", F.lag(F.col("danceability")).over(window_fun)) \
        .withColumn("prev_year_energy", F.lag(F.col("energy")).over(window_fun))


def _calculate_delta(df: DataFrame) -> DataFrame:
    return df \
        .withColumn("danceability_delta", F.col("danceability") - F.col("prev_year_danceability")) \
        .withColumn("energy_delta", F.col("energy") - F.col("prev_year_energy"))


def main():
    spark = SparkSession.builder.appName("vectra_task").getOrCreate()

    RAW_DATA_LOCATION = "./data_by_year.csv"

    df_raw = _read_csv(
        spark=spark,
        filepath=RAW_DATA_LOCATION)
    df_prev_years = _add_prev_years(df=df_raw)
    df_delta = _calculate_delta(df=df_prev_years)

    df_delta \
        .select(
            F.col("year"),
            F.col("energy"),
            F.col("prev_year_energy"),
            F.col("energy_delta"),
            F.col("danceability"),
            F.col("prev_year_danceability"),
            F.col("danceability_delta")
        ).show(5)


if __name__ == "__main__":
    main()


+----+------------------+------------------+-------------------+------------------+----------------------+-------------------+
|year|            energy|  prev_year_energy|       energy_delta|      danceability|prev_year_danceability| danceability_delta|
+----+------------------+------------------+-------------------+------------------+----------------------+-------------------+
|1920|0.4186995702005730|              null|               null|0.5157501432664760|                  null|               null|
|1921|0.2411363461538462|0.4186995702005730|-0.1775632240467268|0.4321705128205130|    0.5157501432664760|-0.0835796304459630|
|1922|0.2261726446280992|0.2411363461538462|-0.0149637015257470|0.5756198347107437|    0.4321705128205130| 0.1434493218902307|
|1923|0.2624064864864865|0.2261726446280992| 0.0362338418583873|0.5773405405405401|    0.5756198347107437| 0.0017207058297964|
|1924|0.3443466101694912|0.2624064864864865| 0.0819401236830047|0.5498940677966102|    0.5773405405405401|-0.02

In [2]:
import unittest

import pandas as pd
from pyspark.sql import SparkSession


class TestVectraTask(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.appName("vectra_task_test").getOrCreate()

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

    def test_add_prev_years(self):
        input_df = self.spark.createDataFrame(
            [
                ("2000", 0.3, 0.2),
                ("2001", 0.5, 0.3),
                ("2002", 0.1, 0.3),
            ],
            ["year", 'energy', 'danceability']
        )
        expected_output = self.spark.createDataFrame(
            [
                ("2000", 0.3, 0.2, None, None),
                ("2001", 0.5, 0.3, 0.2, 0.3),
                ("2002", 0.1, 0.3, 0.3, 0.5),
            ],
            ["year", 'energy', 'danceability', 'prev_year_danceability', 'prev_year_energy']
        )
        pd.testing.assert_frame_equal(_add_prev_years(input_df).toPandas(), expected_output.toPandas())

    def test_calculate_delta(self):
        input_df = self.spark.createDataFrame(
            [
                ("2000", 0.3, 0.2, None, None),
                ("2001", 0.5, 0.3, 0.2, 0.3),
                ("2002", 0.1, 0.3, 0.3, 0.5),
            ],
            ["year", 'energy', 'danceability', 'prev_year_danceability', 'prev_year_energy']
        )
        expected_output = self.spark.createDataFrame(
            [
                ("2000", 0.3, 0.2, None, None, None, None),
                ("2001", 0.5, 0.3, 0.2, 0.3, 0.1, 0.2),
                ("2002", 0.1, 0.3, 0.3, 0.5, 0.0, -0.4),
            ],
            ["year", 'energy', 'danceability', 'prev_year_danceability', 'prev_year_energy', "danceability_delta",
             "energy_delta"]
        )
        pd.testing.assert_frame_equal(_calculate_delta(input_df).toPandas(), expected_output.toPandas())


if __name__ == '__main__':
    unittest.main(argv=['ignored', '-v'], exit=False)



  self._sock = None
  self._sock = None
ok
  self._sock = None
  self._sock = None
ok

----------------------------------------------------------------------
Ran 2 tests in 3.154s

OK
