In [1]:
import pyspark
import pandas as pd
import numpy as np
spark = pyspark.sql.SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import regexp_extract, regexp_replace

#### Remember: you have to 'register' a table to query it with spark.sql
- mpg.createOrReplaceTempView("mpg")

In [2]:
pandas_df = pd.read_csv('activities.csv')

In [3]:
pandas_df.head()

Unnamed: 0,Activity ID,Activity Date,Activity Name,Activity Type,Activity Description,Elapsed Time,Distance,Max Heart Rate,Relative Effort,Commute,...,UV Index,Weather Ozone,"<span class=""translation_missing"" title=""translation missing: en-US.lib.export.portability_exporter.activities.horton_values.jump_count"">Jump Count</span>","<span class=""translation_missing"" title=""translation missing: en-US.lib.export.portability_exporter.activities.horton_values.total_grit"">Total Grit</span>","<span class=""translation_missing"" title=""translation missing: en-US.lib.export.portability_exporter.activities.horton_values.avg_flow"">Avg Flow</span>","<span class=""translation_missing"" title=""translation missing: en-US.lib.export.portability_exporter.activities.horton_values.flagged"">Flagged</span>","<span class=""translation_missing"" title=""translation missing: en-US.lib.export.portability_exporter.activities.horton_values.avg_elapsed_speed"">Avg Elapsed Speed</span>","<span class=""translation_missing"" title=""translation missing: en-US.lib.export.portability_exporter.activities.horton_values.dirt_distance"">Dirt Distance</span>","<span class=""translation_missing"" title=""translation missing: en-US.lib.export.portability_exporter.activities.horton_values.newly_explored_distance"">Newly Explored Distance</span>","<span class=""translation_missing"" title=""translation missing: en-US.lib.export.portability_exporter.activities.horton_values.newly_explored_dirt_distance"">Newly Explored Dirt Distance</span>"
0,350128633,"Jul 20, 2015, 1:06:57 PM",Renecca creek rd,Ride,,2352,13.48,,,False,...,,,,,,,,,,
1,623600149,"Oct 11, 2015, 10:03:14 PM",Afternoon Ride,Ride,,5495,23.61,,,False,...,,,,,,,,,,
2,797001969,"Dec 8, 2016, 4:28:12 PM",Morning Ride,Ride,,8,0.03,,,False,...,,,,,,,,,,
3,800888948,"Dec 12, 2016, 10:22:44 PM",Afternoon Ride,Ride,,7942,10.22,149.0,8.0,False,...,,,,,,,,,,
4,811340614,"Dec 25, 2016, 10:37:32 PM",Afternoon Ride,Ride,,4983,26.16,162.0,32.0,False,...,,,,,,,,,,


In [4]:
pandas_df['Activity Type'].value_counts()

Ride    346
Run     165
Hike     10
Name: Activity Type, dtype: int64

In [5]:
pandas_df.columns.to_list()

['Activity ID',
 'Activity Date',
 'Activity Name',
 'Activity Type',
 'Activity Description',
 'Elapsed Time',
 'Distance',
 'Max Heart Rate',
 'Relative Effort',
 'Commute',
 'Activity Gear',
 'Filename',
 'Athlete Weight',
 'Bike Weight',
 'Elapsed Time.1',
 'Moving Time',
 'Distance.1',
 'Max Speed',
 'Average Speed',
 'Elevation Gain',
 'Elevation Loss',
 'Elevation Low',
 'Elevation High',
 'Max Grade',
 'Average Grade',
 'Average Positive Grade',
 'Average Negative Grade',
 'Max Cadence',
 'Average Cadence',
 'Max Heart Rate.1',
 'Average Heart Rate',
 'Max Watts',
 'Average Watts',
 'Calories',
 'Max Temperature',
 'Average Temperature',
 'Relative Effort.1',
 'Total Work',
 'Number of Runs',
 'Uphill Time',
 'Downhill Time',
 'Other Time',
 'Perceived Exertion',
 '<span class="translation_missing" title="translation missing: en-US.lib.export.portability_exporter.activities.horton_values.type">Type</span>',
 '<span class="translation_missing" title="translation missing: en-US.l

In [6]:
cols = ['Activity ID',
 'Activity Date',
 'Activity Name',
 'Activity Type',
 'Elapsed Time',
 'Distance',
 'Elapsed Time.1',
 'Moving Time',
 'Distance.1',
 'Max Speed',
 'Average Speed',
 'Elevation Gain',
 'Elevation Loss',
 'Elevation Low',
 'Elevation High',
 'Max Grade',
 'Average Grade',
 'Average Watts',
 'Calories']

In [7]:
pandas_df[cols].isna().sum()

Activity ID         0
Activity Date       0
Activity Name       0
Activity Type       0
Elapsed Time        0
Distance            0
Elapsed Time.1      5
Moving Time         0
Distance.1          0
Max Speed           0
Average Speed     148
Elevation Gain      4
Elevation Loss    155
Elevation Low       0
Elevation High      0
Max Grade           0
Average Grade       0
Average Watts     176
Calories           38
dtype: int64

In [8]:
activities = pandas_df[cols]

In [9]:
activities = activities.rename(columns={'Activity ID':'Activity_ID',
 'Activity Date': 'Activity_date',
 'Activity Name': 'Activity_name',
 'Activity Type': 'Activity_Type',
 'Elapsed Time': 'Elapsed_Time_Secs',
 'Elapsed Time.1':'Elapsed_Time_1',
 'Distance': 'Distance_KM',
 'Moving Time': 'Moving_Time_Secs',
 'Distance.1': 'Distance_1',
 'Max Speed':'Max_Speed',
 'Average Speed':'Average_Speed',
 'Elevation Gain': 'Elevation_Gain',
 'Elevation Loss':'Elevation_Loss',
 'Elevation Low':'Elevation_Low',
 'Elevation High':'Elevation_High',
 'Max Grade':'Max_Grade',
 'Average Grade':'Average_Grade',
 'Average Watts':'Average_Watts'})

In [10]:
activities.head().T

Unnamed: 0,0,1,2,3,4
Activity_ID,350128633,623600149,797001969,800888948,811340614
Activity_date,"Jul 20, 2015, 1:06:57 PM","Oct 11, 2015, 10:03:14 PM","Dec 8, 2016, 4:28:12 PM","Dec 12, 2016, 10:22:44 PM","Dec 25, 2016, 10:37:32 PM"
Activity_name,Renecca creek rd,Afternoon Ride,Morning Ride,Afternoon Ride,Afternoon Ride
Activity_Type,Ride,Ride,Ride,Ride,Ride
Elapsed_Time_Secs,2352,5495,8,7942,4983
Distance_KM,13.48,23.61,0.03,10.22,26.16
Elapsed_Time_1,2352.0,5495.0,8.0,7942.0,4983.0
Moving_Time_Secs,2272.0,3965.0,8.0,1851.0,4424.0
Distance_1,13482.599609,23613.199219,30.6,10228.599609,26168.800781
Max_Speed,16.0,15.1,0.2,14.5,15.1


In [11]:
activities.to_csv('activities2.csv')

In [12]:
# df = spark.createDataFrame(pandas_df)


# the usual method is throwing errors related to data types
# some columns have multiple dtypes (NaN, values, etc); giving pyspark issues

In [13]:
df = (spark.read.format("csv").options(header="true")
    .load("activities2.csv"))

In [14]:
df.show(5, vertical=True)

-RECORD 0---------------------------------
 _c0               | 0                    
 Activity_ID       | 350128633            
 Activity_date     | Jul 20, 2015, 1:0... 
 Activity_name     | Renecca creek rd     
 Activity_Type     | Ride                 
 Elapsed_Time_Secs | 2352                 
 Distance_KM       | 13.48                
 Elapsed_Time_1    | 2352.0               
 Moving_Time_Secs  | 2272.0               
 Distance_1        | 13482.599609375      
 Max_Speed         | 16.0                 
 Average_Speed     | null                 
 Elevation_Gain    | 187.07899475097656   
 Elevation_Loss    | null                 
 Elevation_Low     | 283.79998779296875   
 Elevation_High    | 406.8999938964844    
 Max_Grade         | 16.799999237060547   
 Average_Grade     | 0.7268630266189575   
 Average_Watts     | null                 
 Calories          | null                 
-RECORD 1---------------------------------
 _c0               | 1                    
 Activity_I

In [15]:
print('DataFrame shape', df.count(), ' x ', len(df.columns))

DataFrame shape 521  x  20


In [16]:
# df.describe().show(vertical = True)

In [17]:
df.createOrReplaceTempView("df")

In [19]:
spark.sql(
    """
SELECT Distance_KM, Elapsed_Time_Secs, Distance_KM/(Moving_Time_Secs / 3600) AS Avg_Speed_km_hr
FROM df
"""
).show(5)

+-----------+-----------------+------------------+
|Distance_KM|Elapsed_Time_Secs|   Avg_Speed_km_hr|
+-----------+-----------------+------------------+
|      13.48|             2352|21.359154929577468|
|      23.61|             5495|21.436569987389657|
|       0.03|                8|              13.5|
|      10.22|             7942| 19.87682333873582|
|      26.16|             4983|  21.2875226039783|
+-----------+-----------------+------------------+
only showing top 5 rows

