In [1]:
# Link Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [11]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# install spark (change the version number if needed)
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
# unzip the spark file to the current folder
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [12]:

# install findspark using pip
!pip install -q findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=bcc91a32e62dd1de269e482e7aad05ee60e249711487c6156c03b1fbfa99c984
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [13]:
# Initialize spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [14]:
# package Needed
import pandas as pd
import numpy as np

In [15]:
# read DataFrame
from pyspark.sql import *
path = '/content/drive/MyDrive/healthdata'
sdf = spark.read.option("header",True).csv(path)

In [16]:
sdf.head(10)

[Row(_c0='0', weather="{'type': 1, 'humidity': 77, 'wind_direction': 0, 'temperature': 20, 'wind_speed': 11}", id='206052367', hydration=None),
 Row(_c0='1', weather="{'type': 4, 'humidity': 72, 'wind_direction': 0, 'temperature': 21, 'wind_speed': 12}", id='206052395', hydration=None),
 Row(_c0='2', weather="{'type': 7, 'humidity': 88, 'wind_direction': 10, 'temperature': 21, 'wind_speed': 12}", id='205571514', hydration=None),
 Row(_c0='3', weather=None, id='205423460', hydration='0.0'),
 Row(_c0='4', weather="{'type': 1, 'humidity': 56, 'wind_direction': 3, 'temperature': 21, 'wind_speed': 9}", id='205157452', hydration=None),
 Row(_c0='5', weather="{'type': 1, 'humidity': 49, 'wind_direction': 13, 'temperature': 22, 'wind_speed': 11}", id='204946831', hydration=None),
 Row(_c0='6', weather="{'type': 4, 'humidity': 59, 'wind_direction': 13, 'temperature': 19, 'wind_speed': 16}", id='204396350', hydration=None),
 Row(_c0='7', weather="{'type': 6, 'humidity': 82, 'wind_direction': 10,

In [17]:
# change type of column
from pyspark.sql.functions import *
from pyspark.sql.types import *
sdf= sdf.withColumn("weather", from_json(col("weather"), MapType(StringType(), StringType())))

In [18]:
# remove null values in dataframe
sdf_cleaned = sdf.withColumn("type",sdf.weather.getItem("type")) \
  .withColumn('humidity',sdf.weather.getItem('humidity')) \
  .withColumn('wind_direction',sdf.weather.getItem('wind_direction')) \
  .withColumn('temperature',sdf.weather.getItem('temperature')) \
    .withColumn('wind_speed',sdf.weather.getItem('wind_speed')) \
  .drop("weather").na.drop(subset=['type','humidity','wind_direction','temperature','wind_speed']) 
sdf_cleaned.show()

+---+---------+---------+----+--------+--------------+-----------+----------+
|_c0|       id|hydration|type|humidity|wind_direction|temperature|wind_speed|
+---+---------+---------+----+--------+--------------+-----------+----------+
|  0|206052367|     null|   1|      77|             0|         20|        11|
|  1|206052395|     null|   4|      72|             0|         21|        12|
|  2|205571514|     null|   7|      88|            10|         21|        12|
|  4|205157452|     null|   1|      56|             3|         21|         9|
|  5|204946831|     null|   1|      49|            13|         22|        11|
|  6|204396350|     null|   4|      59|            13|         19|        16|
|  7|204041350|     null|   6|      82|            10|         14|        29|
|  8|204041362|     null|   6|      76|            10|         15|        27|
|  9|203876509|     null|  16|      71|             8|         15|        29|
| 10|203492477|     null|   4|      72|             7|         1

In [19]:
# read cleaned one
path_pro = '/content/drive/MyDrive/healthdatapro'
df = spark.read.option("header",True).csv(path_pro)

In [20]:
#merge dataframe
final_df = df.join(sdf_cleaned,df.id ==  sdf.id,"inner").drop(df.id)

In [12]:
final_df.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+---------+---------+----+--------+--------------+-----------+----------+
|  _c0|           longitude|            altitude|            latitude|               sport|          heart_rate|gender|           timestamp|                 url|              userId|               speed|  _c0|       id|hydration|type|humidity|wind_direction|temperature|wind_speed|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+---------+---------+----+--------+--------------+-----------+----------+
|49449|[10.1940886, 10.1...|[0.0, 0.0, 0.0, 0...|[56.1326437, 56.1...|                 run|[123, 123, 123, 1...|  male|https://www.endom...|             2

In [21]:
# select feature
final_df = final_df.drop('timestamp','url','_c0')
final_df.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------------------+-----+---------+---------+----+--------+--------------+-----------+----------+
|           longitude|            altitude|            latitude|               sport|          heart_rate|gender|              userId|speed|       id|hydration|type|humidity|wind_direction|temperature|wind_speed|
+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------------------+-----+---------+---------+----+--------+--------------+-----------+----------+
|[10.1940886, 10.1...|[0.0, 0.0, 0.0, 0...|[56.1326437, 56.1...|                 run|[123, 123, 123, 1...|  male|[1351969475, 1351...| null|108799137|     null|   3|      89|             9|          8|        16|
|[20.2725612, 20.2...|[495.6, 496.0, 49...|[67.8380757, 67.8...|cross-country skiing|[111, 111, 111, 1...|  male|             8276277| null|15686129

In [22]:
#downsample
sdf = final_df.sample(False, 0.1, seed=0).limit(10000)
sdf.toPandas().to_csv('/content/drive/MyDrive/downsample.csv')

In [24]:
# Dummy Encoded
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
stringIndexer = StringIndexer(inputCol="gender", outputCol="gender_index")
model = stringIndexer.fit(final_df)
indexed = model.transform(final_df)
stringIndexer = StringIndexer(inputCol="sport", outputCol="sport_index")
model = stringIndexer.fit(indexed)
encoded = model.transform(indexed)
final_df= encoded.drop("gender","sport")

In [25]:
final_df.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+-----+---------+---------+----+--------+--------------+-----------+----------+------------+-----------+
|           longitude|            altitude|            latitude|          heart_rate|              userId|speed|       id|hydration|type|humidity|wind_direction|temperature|wind_speed|gender_index|sport_index|
+--------------------+--------------------+--------------------+--------------------+--------------------+-----+---------+---------+----+--------+--------------+-----------+----------+------------+-----------+
|[10.1940886, 10.1...|[0.0, 0.0, 0.0, 0...|[56.1326437, 56.1...|[123, 123, 123, 1...|[1351969475, 1351...| null|108799137|     null|   3|      89|             9|          8|        16|         0.0|        0.0|
|[20.2725612, 20.2...|[495.6, 496.0, 49...|[67.8380757, 67.8...|[111, 111, 111, 1...|             8276277| null|156861290|     null|  33|      92|            10

In [26]:
# calculate mean for list column
import json
import numpy as np
import pyspark.sql.functions as F

array_mean = F.udf(lambda x: float(np.mean(json.loads(x))), 'float')
difference_range = F.udf(lambda x: float(np.max(json.loads(x))-np.min(json.loads(x))), 'float')
df2 = final_df.withColumn('heart_rate', array_mean('heart_rate')) \
.withColumn("longitude", difference_range('longitude')) \
.withColumn("altitude", difference_range('altitude')) \
.withColumn("latitude", difference_range('latitude')) \
.withColumn('userId', array_mean('userId'))
from pyspark.sql.types import DoubleType
final_df = df2.withColumn("speed", df2["speed"].cast(DoubleType()))\
.withColumn("type",df2["type"].cast(DoubleType())) \
.withColumn("humidity", df2["humidity"].cast(DoubleType())) \
.withColumn("wind_direction", df2["wind_direction"].cast(DoubleType())) \
.withColumn("temperature", df2["temperature"].cast(DoubleType())) \
.withColumn("wind_speed", df2["wind_speed"].cast(DoubleType())) \
.withColumn("longitude", df2["longitude"].cast(DoubleType())) \
.withColumn("altitude", df2["altitude"].cast(DoubleType())) \
.withColumn("latitude", df2["latitude"].cast(DoubleType())) \
.withColumn("gender_index", df2["gender_index"].cast(DoubleType())) \
.withColumn("sport_index", df2["sport_index"].cast(DoubleType()))
final_df.show(10)

+--------------------+------------------+--------------------+----------+------------+-----+---------+---------+----+--------+--------------+-----------+----------+------------+-----------+
|           longitude|          altitude|            latitude|heart_rate|      userId|speed|       id|hydration|type|humidity|wind_direction|temperature|wind_speed|gender_index|sport_index|
+--------------------+------------------+--------------------+----------+------------+-----+---------+---------+----+--------+--------------+-----------+----------+------------+-----------+
|0.024198299273848534|              45.0|0.011305700056254864|   148.734|1.35197069E9| null|108799137|     null| 3.0|    89.0|           9.0|        8.0|      16.0|         0.0|        0.0|
| 0.06210609897971153|47.599998474121094|0.023440299555659294|   165.284|   8276277.0| null|156861290|     null|33.0|    92.0|          10.0|       -5.0|      16.0|         0.0|        4.0|
| 0.04054969921708107|  85.4000015258789| 0.026724

In [27]:
# turn features to vector
from pyspark.ml.feature import VectorAssembler
assemble=VectorAssembler(inputCols=["heart_rate","userId","type","humidity","wind_direction",
"temperature","wind_speed","gender_index"],outputCol = 'features_raw')

assembled_data=assemble.transform(final_df).select("sport_index", "features_raw")\
.withColumnRenamed("sport_index","label")
assembled_data.show(2)

+-----+--------------------+
|label|        features_raw|
+-----+--------------------+
|  0.0|[148.733993530273...|
|  4.0|[165.283996582031...|
+-----+--------------------+
only showing top 2 rows



In [28]:
# scale the data
from pyspark.ml.feature import StandardScaler
scale=StandardScaler(inputCol='features_raw',outputCol='features')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data).drop('features_raw')
data_scale_output.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[8.69628789764878...|
|  4.0|[9.66394557851215...|
+-----+--------------------+
only showing top 2 rows



In [29]:
# split train and test
train, test, val = data_scale_output.randomSplit([0.6, 0.2, 0.2],seed= 24)

In [30]:
# set model
from pyspark.ml.clustering import GaussianMixture

gmm = GaussianMixture().setK(10).setSeed(538009335)
model = gmm.fit(train)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)

Gaussians shown as a DataFrame: 
+------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|mean                                                                                                                                                        |cov                                                                                                                                                                                                                     

In [31]:
predictions = model.transform(test)
predictions.show(5)

+-----+--------------------+--------------------+----------+
|label|            features|         probability|prediction|
+-----+--------------------+--------------------+----------+
|  0.0|[7.91771860758082...|[4.31914615691030...|         5|
|  0.0|[7.96718363956691...|[1.65329757206793...|         2|
|  0.0|[7.99594962012978...|[1.81538357930711...|         9|
|  0.0|[8.24292148389355...|[3.87500225142086...|         5|
|  0.0|[8.32711659966066...|[1.77421341704667...|         9|
+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [None]:

print((predictions.where(predictions.label == predictions.prediction).count())/predictions.count())
  


In [None]:
# deal with large dataset
#import gzip
#data = []
#with gzip.open('endomondoHR.json.gz') as f:
#with open('/content/drive/MyDrive/endomondoMeta.json') as f:
    #i = 1
    #for l in f:
      #  if i % 100000!=0:
       #     data.append(eval(l))
        #    i+=1
       # else:
        #    path = '/content/drive/MyDrive/'+str(i//100000)+'.csv'
         #   pd.DataFrame(data)[['weather','id','hydration']].to_csv(path)
         #   i+=1
         #   data = []
         #   print(i)


50001
100001
150001
