#PySpark Task - Fellowship Data
#Data Manipulation using CSV File

In [0]:
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/evashrestha99@gmail.com/fellowship_data.csv")

In [0]:
df.show(100)

+--------+------------+-----+--------+--------+
|Duration|        Date|Pulse|Maxpulse|Calories|
+--------+------------+-----+--------+--------+
|      60|'2020/12/01'|  110|     130|   409.1|
|      60|'2020/12/02'|  117|     145|   479.0|
|      60|'2020/12/03'|  103|     135|   340.0|
|      45|'2020/12/04'|  109|     175|   282.4|
|      45|'2020/12/05'|  117|     148|   406.0|
|      60|'2020/12/06'|  102|     127|   300.0|
|      60|'2020/12/07'|  110|     136|   374.0|
|     450|'2020/12/08'|  104|     134|   253.3|
|      30|'2020/12/09'|  109|     133|   195.1|
|      60|'2020/12/10'|   98|     124|   269.0|
|      60|'2020/12/11'|  103|     147|   329.3|
|      60|'2020/12/12'|  100|     120|   250.7|
|      60|'2020/12/12'|  100|     120|   250.7|
|      60|'2020/12/13'|  106|     128|   345.3|
|      60|'2020/12/14'|  104|     132|   379.3|
|      60|'2020/12/15'|   98|     123|   275.0|
|      60|'2020/12/16'|   98|     120|   215.2|
|      60|'2020/12/17'|  100|     120|  

In [0]:
df.dtypes

Out[18]: [('Duration', 'string'),
 ('Date', 'string'),
 ('Pulse', 'string'),
 ('Maxpulse', 'string'),
 ('Calories', 'string')]

#Convert the odd string in Date column to a format like others

In [0]:
from pyspark.sql.functions import date_format, to_date, when, regexp_replace, col

In [0]:
df = df.withColumn('Date', when(to_date(df['Date'],'yyyyMMdd').isNotNull(), date_format(to_date(df['Date'], 'yyyyMMdd'),'yyyy/MM/dd')).otherwise(col('Date')))


In [0]:
df.show(100)

+--------+------------+-----+--------+--------+
|Duration|        Date|Pulse|Maxpulse|Calories|
+--------+------------+-----+--------+--------+
|      60|'2020/12/01'|  110|     130|   409.1|
|      60|'2020/12/02'|  117|     145|   479.0|
|      60|'2020/12/03'|  103|     135|   340.0|
|      45|'2020/12/04'|  109|     175|   282.4|
|      45|'2020/12/05'|  117|     148|   406.0|
|      60|'2020/12/06'|  102|     127|   300.0|
|      60|'2020/12/07'|  110|     136|   374.0|
|     450|'2020/12/08'|  104|     134|   253.3|
|      30|'2020/12/09'|  109|     133|   195.1|
|      60|'2020/12/10'|   98|     124|   269.0|
|      60|'2020/12/11'|  103|     147|   329.3|
|      60|'2020/12/12'|  100|     120|   250.7|
|      60|'2020/12/12'|  100|     120|   250.7|
|      60|'2020/12/13'|  106|     128|   345.3|
|      60|'2020/12/14'|  104|     132|   379.3|
|      60|'2020/12/15'|   98|     123|   275.0|
|      60|'2020/12/16'|   98|     120|   215.2|
|      60|'2020/12/17'|  100|     120|  

In [0]:
# Replacing single quotation marks in the 'Date' column

df = df.withColumn('Date', regexp_replace(df['Date'], "'", ""))

In [0]:
df.show()

+--------+----------+-----+--------+--------+
|Duration|      Date|Pulse|Maxpulse|Calories|
+--------+----------+-----+--------+--------+
|      60|2020/12/01|  110|     130|   409.1|
|      60|2020/12/02|  117|     145|   479.0|
|      60|2020/12/03|  103|     135|   340.0|
|      45|2020/12/04|  109|     175|   282.4|
|      45|2020/12/05|  117|     148|   406.0|
|      60|2020/12/06|  102|     127|   300.0|
|      60|2020/12/07|  110|     136|   374.0|
|     450|2020/12/08|  104|     134|   253.3|
|      30|2020/12/09|  109|     133|   195.1|
|      60|2020/12/10|   98|     124|   269.0|
|      60|2020/12/11|  103|     147|   329.3|
|      60|2020/12/12|  100|     120|   250.7|
|      60|2020/12/12|  100|     120|   250.7|
|      60|2020/12/13|  106|     128|   345.3|
|      60|2020/12/14|  104|     132|   379.3|
|      60|2020/12/15|   98|     123|   275.0|
|      60|2020/12/16|   98|     120|   215.2|
|      60|2020/12/17|  100|     120|   300.0|
|      45|2020/12/18|   90|     11

#Convert the Date column to a datetime object.

In [0]:
df = df.withColumn('Date', to_date(df['Date'], 'yyyy/MM/dd'))
df.dtypes


Out[26]: [('Duration', 'string'),
 ('Date', 'date'),
 ('Pulse', 'string'),
 ('Maxpulse', 'string'),
 ('Calories', 'string')]

In [0]:
df.show()

+--------+----------+-----+--------+--------+
|Duration|      Date|Pulse|Maxpulse|Calories|
+--------+----------+-----+--------+--------+
|      60|2020-12-01|  110|     130|   409.1|
|      60|2020-12-02|  117|     145|   479.0|
|      60|2020-12-03|  103|     135|   340.0|
|      45|2020-12-04|  109|     175|   282.4|
|      45|2020-12-05|  117|     148|   406.0|
|      60|2020-12-06|  102|     127|   300.0|
|      60|2020-12-07|  110|     136|   374.0|
|     450|2020-12-08|  104|     134|   253.3|
|      30|2020-12-09|  109|     133|   195.1|
|      60|2020-12-10|   98|     124|   269.0|
|      60|2020-12-11|  103|     147|   329.3|
|      60|2020-12-12|  100|     120|   250.7|
|      60|2020-12-12|  100|     120|   250.7|
|      60|2020-12-13|  106|     128|   345.3|
|      60|2020-12-14|  104|     132|   379.3|
|      60|2020-12-15|   98|     123|   275.0|
|      60|2020-12-16|   98|     120|   215.2|
|      60|2020-12-17|  100|     120|   300.0|
|      45|2020-12-18|   90|     11

#Find the if there are any duplicates

In [0]:
duplicate_rows = df.groupBy(df.columns).count().filter(col("count")>1)
duplicate_rows.show()

+--------+----------+-----+--------+--------+-----+
|Duration|      Date|Pulse|Maxpulse|Calories|count|
+--------+----------+-----+--------+--------+-----+
|      60|2020-12-12|  100|     120|   250.7|    2|
+--------+----------+-----+--------+--------+-----+



In [0]:
# removing duplicates

df = df.dropDuplicates()

In [0]:
#checking for duplicate again

duplicate_rows = df.groupBy(df.columns).count().filter(col("count")>1)
duplicate_rows.show()

+--------+----+-----+--------+--------+-----+
|Duration|Date|Pulse|Maxpulse|Calories|count|
+--------+----+-----+--------+--------+-----+
+--------+----+-----+--------+--------+-----+



#Create a column that finds difference in Max Pulse and pulse

In [0]:
df = df.withColumn("PulseDifference", df['Maxpulse']-df['Pulse'])
df.show()

+--------+----------+-----+--------+--------+---------------+
|Duration|      Date|Pulse|Maxpulse|Calories|PulseDifference|
+--------+----------+-----+--------+--------+---------------+
|      60|2020-12-14|  104|     132|   379.3|           28.0|
|      60|2020-12-03|  103|     135|   340.0|           32.0|
|      45|2020-12-18|   90|     112|    null|           22.0|
|      60|2020-12-17|  100|     120|   300.0|           20.0|
|      60|2020-12-10|   98|     124|   269.0|           26.0|
|      45|2020-12-20|   97|     125|   243.0|           28.0|
|      60|2020-12-12|  100|     120|   250.7|           20.0|
|      60|2020-12-06|  102|     127|   300.0|           25.0|
|      30|2020-12-09|  109|     133|   195.1|           24.0|
|      45|2020-12-04|  109|     175|   282.4|           66.0|
|      60|2020-12-19|  103|     123|   323.0|           20.0|
|      60|2020-12-13|  106|     128|   345.3|           22.0|
|      60|2020-12-11|  103|     147|   329.3|           44.0|
|      6

#Find how many calories they burned per second.

In [0]:
df = df.withColumn('CaloriesPerSecond', df['Calories']/(df['Duration']*60))
df.show()

+--------+----------+-----+--------+--------+---------------+--------------------+
|Duration|      Date|Pulse|Maxpulse|Calories|PulseDifference|   CaloriesPerSecond|
+--------+----------+-----+--------+--------+---------------+--------------------+
|      60|2020-12-14|  104|     132|   379.3|           28.0| 0.10536111111111111|
|      60|2020-12-03|  103|     135|   340.0|           32.0| 0.09444444444444444|
|      45|2020-12-18|   90|     112|    null|           22.0|                null|
|      60|2020-12-17|  100|     120|   300.0|           20.0| 0.08333333333333333|
|      60|2020-12-10|   98|     124|   269.0|           26.0| 0.07472222222222222|
|      45|2020-12-20|   97|     125|   243.0|           28.0|                0.09|
|      60|2020-12-12|  100|     120|   250.7|           20.0| 0.06963888888888889|
|      60|2020-12-06|  102|     127|   300.0|           25.0| 0.08333333333333333|
|      30|2020-12-09|  109|     133|   195.1|           24.0| 0.10838888888888888|
|   

#Sort the data frame by calories.

In [0]:
df = df.orderBy("Calories")
df.show()

+--------+----------+-----+--------+--------+---------------+--------------------+
|Duration|      Date|Pulse|Maxpulse|Calories|PulseDifference|   CaloriesPerSecond|
+--------+----------+-----+--------+--------+---------------+--------------------+
|      45|2020-12-18|   90|     112|    null|           22.0|                null|
|      60|2020-12-28|  103|     132|    null|           29.0|                null|
|      30|2020-12-09|  109|     133|   195.1|           24.0| 0.10838888888888888|
|      60|2020-12-16|   98|     120|   215.2|           22.0| 0.05977777777777778|
|      60|2020-12-27|   92|     118|   241.0|           26.0| 0.06694444444444445|
|      45|2020-12-20|   97|     125|   243.0|           28.0|                0.09|
|      60|2020-12-31|   92|     115|   243.0|           23.0|              0.0675|
|      45|2020-12-24|  105|     132|   246.0|           27.0| 0.09111111111111111|
|      60|2020-12-26|  100|     120|   250.0|           20.0| 0.06944444444444445|
|   

#Retrieve a subset of the dataframes 
where calories are higher than 400, less than 200 and between 200-300.

In [0]:
subset_1 = df.filter(df['Calories'] > 400)

subset_2 = df.filter(df['Calories'] < 200)

subset_3 = df.filter((df['Calories'] >= 200) & (df['Calories'] <= 300))

combined_df = subset_1.union(subset_2).union(subset_3)
combined_df.show()

+--------+----------+-----+--------+--------+---------------+--------------------+
|Duration|      Date|Pulse|Maxpulse|Calories|PulseDifference|   CaloriesPerSecond|
+--------+----------+-----+--------+--------+---------------+--------------------+
|      45|2020-12-05|  117|     148|   406.0|           31.0| 0.15037037037037038|
|      60|2020-12-01|  110|     130|   409.1|           20.0|  0.1136388888888889|
|      60|2020-12-02|  117|     145|   479.0|           28.0| 0.13305555555555557|
|      30|2020-12-09|  109|     133|   195.1|           24.0| 0.10838888888888888|
|      60|2020-12-16|   98|     120|   215.2|           22.0| 0.05977777777777778|
|      60|2020-12-27|   92|     118|   241.0|           26.0| 0.06694444444444445|
|      45|2020-12-20|   97|     125|   243.0|           28.0|                0.09|
|      60|2020-12-31|   92|     115|   243.0|           23.0|              0.0675|
|      45|2020-12-24|  105|     132|   246.0|           27.0| 0.09111111111111111|
|   

#Find Unique values in duration

In [0]:
unique_durations = df.select("duration").distinct()
unique_durations.show()

+--------+
|duration|
+--------+
|      30|
|      60|
|     450|
|      45|
+--------+



#Count how many missing values are there in the dataframe

In [0]:
null_val = []
for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    null_val.append((column, null_count))

null_val

Out[36]: [('Duration', 0),
 ('Date', 1),
 ('Pulse', 0),
 ('Maxpulse', 0),
 ('Calories', 2),
 ('PulseDifference', 0),
 ('CaloriesPerSecond', 2)]

#There is an anomalous value in the data. Replace it.

In [0]:
display(df)

Duration,Date,Pulse,Maxpulse,Calories,PulseDifference,CaloriesPerSecond
45,2020-12-18,90,112,,22.0,
60,2020-12-28,103,132,,29.0,
30,2020-12-09,109,133,195.1,24.0,0.1083888888888888
60,2020-12-16,98,120,215.2,22.0,0.0597777777777777
60,2020-12-27,92,118,241.0,26.0,0.0669444444444444
45,2020-12-20,97,125,243.0,28.0,0.09
60,2020-12-31,92,115,243.0,23.0,0.0675
45,2020-12-24,105,132,246.0,27.0,0.0911111111111111
60,2020-12-26,100,120,250.0,20.0,0.0694444444444444
60,2020-12-12,100,120,250.7,20.0,0.0696388888888888


In [0]:
df = df.withColumn("Duration", when(col("Duration") > 60, 60).otherwise(col("Duration")))

In [0]:
display(df)

Duration,Date,Pulse,Maxpulse,Calories,PulseDifference,CaloriesPerSecond
45,2020-12-18,90,112,,22.0,
60,2020-12-28,103,132,,29.0,
30,2020-12-09,109,133,195.1,24.0,0.1083888888888888
60,2020-12-16,98,120,215.2,22.0,0.0597777777777777
60,2020-12-27,92,118,241.0,26.0,0.0669444444444444
45,2020-12-20,97,125,243.0,28.0,0.09
60,2020-12-31,92,115,243.0,23.0,0.0675
45,2020-12-24,105,132,246.0,27.0,0.0911111111111111
60,2020-12-26,100,120,250.0,20.0,0.0694444444444444
60,2020-12-12,100,120,250.7,20.0,0.0696388888888888
