In [1]:
import findspark 

findspark.init()
findspark.find()

'C:/tools/spark-3.3.2-bin-hadoop3'

In [47]:
import os

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("spark_session") \
    .getOrCreate()

In [6]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [5]:
parsed_data = "parsed_data_daily_activity.csv"

In [7]:
spark_df = spark.read.csv(parsed_data,
                    header='true',
                    inferSchema='true',
                    ignoreLeadingWhiteSpace=True,
                    ignoreTrailingWhiteSpace=True)

In [37]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import lead, lag, col

In [9]:
spark_df = spark_df.withColumn("ActivityDate", F.to_date("ActivityDate", "MM/dd/yyyy"))

In [13]:
spark_df = spark_df.selectExpr(
    "Id",
    "ActivityDate",
    "TotalSteps",
    "TotalDistance",
    "Calories"
)

In [17]:
spark_df = spark_df.withColumnRenamed("ActivityDate", "dateFor")

In [28]:
windowSpec = Window.partitionBy("Id").orderBy(F.col('dateFor').asc())

In [29]:
df_with_diff = spark_df \
    .withColumn("TotalStepsDiff", F.coalesce(F.col("TotalSteps") - F.lag("TotalSteps", 1).over(windowSpec), F.lit(0)) ) \
    .withColumn("TotalDistanceDiff", F.coalesce(F.col("TotalDistance") - F.lag("TotalDistance", 1).over(windowSpec), F.lit(0)) ) \
    .withColumn("CaloriesDiff", F.coalesce(F.col("Calories") - F.lag("Calories", 1).over(windowSpec), F.lit(0)) )

In [30]:
df_with_diff

DataFrame[Id: bigint, dateFor: date, TotalSteps: int, TotalDistance: double, Calories: int, TotalStepsDiff: int, TotalDistanceDiff: double, CaloriesDiff: int]

In [31]:
df_with_diff.printSchema()

root
 |-- Id: long (nullable = true)
 |-- dateFor: date (nullable = true)
 |-- TotalSteps: integer (nullable = true)
 |-- TotalDistance: double (nullable = true)
 |-- Calories: integer (nullable = true)
 |-- TotalStepsDiff: integer (nullable = false)
 |-- TotalDistanceDiff: double (nullable = false)
 |-- CaloriesDiff: integer (nullable = false)



In [39]:
spark_df.printSchema()

root
 |-- Id: long (nullable = true)
 |-- dateFor: date (nullable = true)
 |-- TotalSteps: integer (nullable = true)
 |-- TotalDistance: double (nullable = true)
 |-- Calories: integer (nullable = true)



In [40]:
spark_df_start = spark.read.csv(parsed_data,
                    header='true',
                    inferSchema='true',
                    ignoreLeadingWhiteSpace=True,
                    ignoreTrailingWhiteSpace=True)

In [41]:
total_minutes = spark_df_start \
        .withColumn("TotalMinutes", col("VeryActiveMinutes") + col("FairlyActiveMinutes") + col("LightlyActiveMinutes") + col("SedentaryMinutes"))

In [42]:
df_activity_percentage = total_minutes.selectExpr(
    "Id",
    "ActivityDate",
    "VeryActiveMinutes / TotalMinutes * 100 as VeryActivePercentage",
    "FairlyActiveMinutes / TotalMinutes * 100 as FairlyActivePercentage",
    "LightlyActiveMinutes / TotalMinutes * 100 as LightlyActivePercentage",
    "SedentaryMinutes / TotalMinutes * 100 as SedentaryPercentage"
)

In [44]:
result_diff = df_with_diff.fillna(0)
result_activity = df_activity_percentage.fillna(0)

In [46]:
result_diff.toPandas().to_csv('/opt/airflow/spark_files/result_diff.csv',
                         sep=',',
                         header=True,
                         index=False)
result_activity.toPandas().to_csv('/opt/airflow/spark_files/result_activity.csv',
                         sep=',',
                         header=True,
                         index=False)

OSError: Cannot save file into a non-existent directory: '\opt\airflow\spark_files'

In [None]:
os.remove(parsed_data)