In [1]:
import findspark
findspark.add_jars('/app/postgresql-42.1.4.jar')
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("argentinaDatosAdd")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.driver.memory", "15g")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [34]:
from pyspark.sql.functions import *;
from pyspark.sql.types import *;
from scipy.stats import *;
import pyspark.sql.functions as F

In [7]:
# Create dataframe from table in Postgres DB
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://postgres/ha_db") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "human_activity.ha_dataset") \
.option("user", "ha_user") \
.option("password", "hnm4/4c71v1tY") \
.load()

# Create agregated dataframes by user_id

In [28]:
# Create agregated table by user_id for each label column (count of examples for each activity marked)

user_agg_lab = (df.groupBy('user_id')\
    .agg(count('timestamp').alias('number_examples'),
    sum('lab_FIX_walking').alias('lab_FIX_walking'),
    sum('lab_FIX_running').alias('lab_FIX_running'),
    sum('lab_BICYCLING').alias('lab_BICYCLING'),
    sum('lab_IN_A_CAR').alias('lab_IN_A_CAR'),
    sum('lab_ON_A_BUS').alias('lab_ON_A_BUS'),
    sum('lab_DRIVE_I_M_THE_DRIVER').alias('lab_DRIVE_I_M_THE_DRIVER'),
    sum('lab_DRIVE_I_M_A_PASSENGER').alias('lab_DRIVE_I_M_A_PASSENGER'),
    sum('lab_STROLLING').alias('lab_STROLLING'),
    sum('lab_STAIRS_GOING_UP').alias('lab_STAIRS_GOING_UP'),
    sum('lab_STAIRS_GOING_DOWN').alias('lab_STAIRS_GOING_DOWN'),
    sum('lab_new_geographical_displac').alias('lab_new_geographical_displac'),
    sum('lab_new_geographical_locat').alias('lab_new_geographical_locat'))
.orderBy(desc('user_id'))
)

user_agg_lab.show(n=1, vertical=True)

-RECORD 0--------------------------------------------
 user_id                      | A76A5AF5-5A93-4CF... 
 number_examples              | 7520                 
 lab_FIX_walking              | 800                  
 lab_FIX_running              | 88                   
 lab_BICYCLING                | 56                   
 lab_IN_A_CAR                 | 250                  
 lab_ON_A_BUS                 | 16                   
 lab_DRIVE_I_M_THE_DRIVER     | 202                  
 lab_DRIVE_I_M_A_PASSENGER    | 0                    
 lab_STROLLING                | 0                    
 lab_STAIRS_GOING_UP          | 0                    
 lab_STAIRS_GOING_DOWN        | 1                    
 lab_new_geographical_displac | 1225                 
 lab_new_geographical_locat   | 6281                 
only showing top 1 row



In [31]:
# Create agregated table by user_id for each sensor column (count of examples with not null value)

user_agg_sens = (df.groupBy('user_id') \
    .agg(count('timestamp').alias('number_examples'),
    (F.count(F.col("acc_magnitude_mean").isNotNull()).alias("acc_magnitude_mean_notnull")),
    (F.count(F.col("acc_magnitude_std").isNotNull()).alias("acc_magnitude_std_notnull")),
    (F.count(F.col("acc_magnitude_value_entropy").isNotNull()).alias("acc_magnitude_value_entropy_notnull")),
    (F.count(F.col("acc_magnitude_time_entropy").isNotNull()).alias("acc_magnitude_time_entropy_notnull")),
    (F.count(F.col("acc_spec_spectral_entropy").isNotNull()).alias("acc_spec_spectral_entropy_notnull")),
    (F.count(F.col("acc_3d_mean_x").isNotNull()).alias("acc_3d_mean_x_notnull")),
    (F.count(F.col("acc_3d_mean_y").isNotNull()).alias("acc_3d_mean_y_notnull")),
    (F.count(F.col("acc_3d_mean_z").isNotNull()).alias("acc_3d_mean_z_notnull")),
    (F.count(F.col("acc_3d_std_x").isNotNull()).alias("acc_3d_std_x_notnull")),
    (F.count(F.col("acc_3d_std_y").isNotNull()).alias("acc_3d_std_y_notnull")),
    (F.count(F.col("acc_3d_std_z").isNotNull()).alias("acc_3d_std_z_notnull")),
    (F.count(F.col("acc_3d_ro_x").isNotNull()).alias("acc_3d_ro_x_notnull")),
    (F.count(F.col("acc_3d_ro_y").isNotNull()).alias("acc_3d_ro_y_notnull")),
    (F.count(F.col("acc_3d_ro_z").isNotNull()).alias("acc_3d_ro_z_notnull")),
    (F.count(F.col("gyro_magnitude_mean").isNotNull()).alias("gyro_magnitude_mean_notnull")),
    (F.count(F.col("gyro_magnitude_std").isNotNull()).alias("gyro_magnitude_std_notnull")),
    (F.count(F.col("gyro_magnitude_value_entropy").isNotNull()).alias("gyro_magnitude_value_entropy_notnull")),
    (F.count(F.col("gyro_magnitude_time_entropy").isNotNull()).alias("gyro_magnitude_time_entropy_notnull")),
    (F.count(F.col("gyro_spec_spectral_entropy").isNotNull()).alias("gyro_spec_spectral_entropy_notnull")),
    (F.count(F.col("gyro_3d_mean_x").isNotNull()).alias("gyro_3d_mean_x_notnull")),
    (F.count(F.col("gyro_3d_mean_y").isNotNull()).alias("gyro_3d_mean_y_notnull")),
    (F.count(F.col("gyro_3d_mean_z").isNotNull()).alias("gyro_3d_mean_z_notnull")),
    (F.count(F.col("gyro_3d_std_x").isNotNull()).alias("gyro_3d_std_x_notnull")),
    (F.count(F.col("gyro_3d_std_y").isNotNull()).alias("gyro_3d_std_y_notnull")),
    (F.count(F.col("gyro_3d_std_z").isNotNull()).alias("gyro_3d_std_z_notnull")),
    (F.count(F.col("gyro_3d_ro_xy").isNotNull()).alias("gyro_3d_ro_xy_notnull")),
    (F.count(F.col("gyro_3d_ro_xz").isNotNull()).alias("gyro_3d_ro_xz_notnull")),
    (F.count(F.col("gyro_3d_ro_yz").isNotNull()).alias("gyro_3d_ro_yz_notnull")),
    (F.count(F.col("magnet_magnitude_mean").isNotNull()).alias("magnet_magnitude_mean_notnull")),
    (F.count(F.col("magnet_magnitude_std").isNotNull()).alias("magnet_magnitude_std_notnull")),
    (F.count(F.col("magnet_magnitude_value_entropy").isNotNull()).alias("magnet_magnitude_value_entropy_notnull")),
    (F.count(F.col("magnet_magnitude_time_entropy").isNotNull()).alias("magnet_magnitude_time_entropy_notnull")),
    (F.count(F.col("magnet_spec_spectral_entropy").isNotNull()).alias("magnet_spec_spectral_entropy_notnull")),
    (F.count(F.col("magnet_3d_mean_x").isNotNull()).alias("magnet_3d_mean_x_notnull")),
    (F.count(F.col("magnet_3d_mean_y").isNotNull()).alias("magnet_3d_mean_y_notnull")),
    (F.count(F.col("magnet_3d_mean_z").isNotNull()).alias("magnet_3d_mean_z_notnull")),
    (F.count(F.col("magnet_3d_std_x").isNotNull()).alias("magnet_3d_std_x_notnull")),
    (F.count(F.col("magnet_3d_std_y").isNotNull()).alias("magnet_3d_std_y_notnull")),
    (F.count(F.col("magnet_3d_std_z").isNotNull()).alias("magnet_3d_std_z_notnull")),
    (F.count(F.col("magnet_3d_ro_xy").isNotNull()).alias("magnet_3d_ro_xy_notnull")),
    (F.count(F.col("magnet_3d_ro_xz").isNotNull()).alias("magnet_3d_ro_xz_notnull")),
    (F.count(F.col("magnet_3d_ro_yz").isNotNull()).alias("magnet_3d_ro_yz_notnull")),
    (F.count(F.col("acc_watch_magnitude_mean").isNotNull()).alias("acc_watch_magnitude_mean_notnull")),
    (F.count(F.col("acc_watch_magnitude_std").isNotNull()).alias("acc_watch_magnitude_std_notnull")),
    (F.count(F.col("acc_watch_magnitude_value_entropy").isNotNull()).alias("acc_watch_magnitude_value_entropy_notnull")),
    (F.count(F.col("acc_watch_magnitude_time_entropy").isNotNull()).alias("acc_watch_magnitude_time_entropy_notnull")),
    (F.count(F.col("acc_watch_spec_spectral_entropy").isNotNull()).alias("acc_watch_spec_spectral_entropy_notnull")),
    (F.count(F.col("acc_watch_3d_mean_x").isNotNull()).alias("acc_watch_3d_mean_x_notnull")),
    (F.count(F.col("acc_watch_3d_mean_y").isNotNull()).alias("acc_watch_3d_mean_y_notnull")),
    (F.count(F.col("acc_watch_3d_mean_z").isNotNull()).alias("acc_watch_3d_mean_z_notnull")),
    (F.count(F.col("acc_watch_3d_std_x").isNotNull()).alias("acc_watch_3d_std_x_notnull")),
    (F.count(F.col("acc_watch_3d_std_y").isNotNull()).alias("acc_watch_3d_std_y_notnull")),
    (F.count(F.col("acc_watch_3d_std_z").isNotNull()).alias("acc_watch_3d_std_z_notnull")),
    (F.count(F.col("acc_watch_3d_ro_xy").isNotNull()).alias("acc_watch_3d_ro_xy_notnull")),
    (F.count(F.col("acc_watch_3d_ro_xz").isNotNull()).alias("acc_watch_3d_ro_xz_notnull")),
    (F.count(F.col("acc_watch_3d_ro_yz").isNotNull()).alias("acc_watch_3d_ro_yz_notnull")),
    (F.count(F.col("loc_valid_updates").isNotNull()).alias("loc_valid_updates_notnull")),
    (F.count(F.col("loc_log_latitude_range").isNotNull()).alias("loc_log_latitude_range_notnull")),
    (F.count(F.col("loc_log_longitude_range").isNotNull()).alias("loc_log_longitude_range_notnull")),
    (F.count(F.col("loc_min_altitude").isNotNull()).alias("loc_min_altitude_notnull")),
    (F.count(F.col("loc_max_altitude").isNotNull()).alias("loc_max_altitude_notnull")),
    (F.count(F.col("loc_best_horizontal_accuracy").isNotNull()).alias("loc_best_horizontal_accuracy_notnull")),
    (F.count(F.col("loc_best_vertical_accuracy").isNotNull()).alias("loc_best_vertical_accuracy_notnull")),
    (F.count(F.col("loc_diameter").isNotNull()).alias("loc_diameter_notnull")),
    (F.count(F.col("loc_log_diameter").isNotNull()).alias("loc_log_diameter_notnull")),
    (F.count(F.col("loc_features_std_lat").isNotNull()).alias("loc_features_std_lat_notnull")),
    (F.count(F.col("loc_features_std_long").isNotNull()).alias("loc_features_std_long_notnull")),
    (F.count(F.col("loc_features_lat_change").isNotNull()).alias("loc_features_lat_change_notnull")),
    (F.count(F.col("loc_features_log_change").isNotNull()).alias("loc_features_log_change_notnull")),
    (F.count(F.col("loc_features_mean_abs_lat_deriv").isNotNull()).alias("loc_features_mean_abs_lat_deriv_notnull")),
    (F.count(F.col("loc_features_mean_abs_long_deriv").isNotNull()).alias("loc_features_mean_abs_long_deriv_notnull")))
    .orderBy(desc('user_id'))
     )

user_agg_sens.show(n=1, vertical=True)

-RECORD 0---------------------------------------------------------
 user_id                                   | A76A5AF5-5A93-4CF... 
 number_examples                           | 7520                 
 acc_magnitude_mean_notnull                | 7520                 
 acc_magnitude_std_notnull                 | 7520                 
 acc_magnitude_value_entropy_notnull       | 7520                 
 acc_magnitude_time_entropy_notnull        | 7520                 
 acc_spec_spectral_entropy_notnull         | 7520                 
 acc_3d_mean_x_notnull                     | 7520                 
 acc_3d_mean_y_notnull                     | 7520                 
 acc_3d_mean_z_notnull                     | 7520                 
 acc_3d_std_x_notnull                      | 7520                 
 acc_3d_std_y_notnull                      | 7520                 
 acc_3d_std_z_notnull                      | 7520                 
 acc_3d_ro_x_notnull                       | 7520             

In [43]:
user_agg = user_agg_lab.join(user_agg_sens, user_agg_lab.user_id == user_agg_sens.user_id).select(
                    user_agg_lab.user_id,
                    user_agg_lab.number_examples,              
                    user_agg_lab.lab_FIX_walking, 
                    user_agg_lab.lab_FIX_running, 
                    user_agg_lab.lab_BICYCLING,
                    user_agg_lab.lab_IN_A_CAR, 
                    user_agg_lab.lab_ON_A_BUS,
                    user_agg_lab.lab_DRIVE_I_M_THE_DRIVER,
                    user_agg_lab.lab_DRIVE_I_M_A_PASSENGER,
                    user_agg_lab.lab_STROLLING,
                    user_agg_lab.lab_STAIRS_GOING_UP,
                    user_agg_lab.lab_STAIRS_GOING_DOWN,
                    user_agg_lab.lab_new_geographical_displac,
                    user_agg_lab.lab_new_geographical_locat,
                    user_agg_sens.acc_magnitude_mean_notnull,
                    user_agg_sens.acc_magnitude_std_notnull,
                    user_agg_sens.acc_magnitude_value_entropy_notnull,
                    user_agg_sens.acc_magnitude_time_entropy_notnull,
                    user_agg_sens.acc_spec_spectral_entropy_notnull,
                    user_agg_sens.acc_3d_mean_x_notnull,
                    user_agg_sens.acc_3d_mean_y_notnull,
                    user_agg_sens.acc_3d_mean_z_notnull,
                    user_agg_sens.acc_3d_std_x_notnull,
                    user_agg_sens.acc_3d_std_y_notnull,
                    user_agg_sens.acc_3d_std_z_notnull,
                    user_agg_sens.acc_3d_ro_x_notnull,
                    user_agg_sens.acc_3d_ro_y_notnull,
                    user_agg_sens.acc_3d_ro_z_notnull,
                    user_agg_sens.gyro_magnitude_mean_notnull,
                    user_agg_sens.gyro_magnitude_std_notnull,
                    user_agg_sens.gyro_magnitude_value_entropy_notnull,
                    user_agg_sens.gyro_magnitude_time_entropy_notnull,
                    user_agg_sens.gyro_spec_spectral_entropy_notnull,
                    user_agg_sens.gyro_3d_mean_x_notnull,
                    user_agg_sens.gyro_3d_mean_y_notnull,
                    user_agg_sens.gyro_3d_mean_z_notnull,
                    user_agg_sens.gyro_3d_std_x_notnull,
                    user_agg_sens.gyro_3d_std_y_notnull,
                    user_agg_sens.gyro_3d_std_z_notnull,
                    user_agg_sens.gyro_3d_ro_xy_notnull,
                    user_agg_sens.gyro_3d_ro_xz_notnull,
                    user_agg_sens.gyro_3d_ro_yz_notnull,
                    user_agg_sens.magnet_magnitude_mean_notnull,
                    user_agg_sens.magnet_magnitude_std_notnull,
                    user_agg_sens.magnet_magnitude_value_entropy_notnull,
                    user_agg_sens.magnet_magnitude_time_entropy_notnull,
                    user_agg_sens.magnet_spec_spectral_entropy_notnull,
                    user_agg_sens.magnet_3d_mean_x_notnull,
                    user_agg_sens.magnet_3d_mean_y_notnull,
                    user_agg_sens.magnet_3d_mean_z_notnull,
                    user_agg_sens.magnet_3d_std_x_notnull,
                    user_agg_sens.magnet_3d_std_y_notnull,
                    user_agg_sens.magnet_3d_std_z_notnull,
                    user_agg_sens.magnet_3d_ro_xy_notnull,
                    user_agg_sens.magnet_3d_ro_xz_notnull,
                    user_agg_sens.magnet_3d_ro_yz_notnull,
                    user_agg_sens.acc_watch_magnitude_mean_notnull,
                    user_agg_sens.acc_watch_magnitude_std_notnull,
                    user_agg_sens.acc_watch_magnitude_value_entropy_notnull,
                    user_agg_sens.acc_watch_magnitude_time_entropy_notnull,
                    user_agg_sens.acc_watch_spec_spectral_entropy_notnull,
                    user_agg_sens.acc_watch_3d_mean_x_notnull,
                    user_agg_sens.acc_watch_3d_mean_y_notnull,
                    user_agg_sens.acc_watch_3d_mean_z_notnull,
                    user_agg_sens.acc_watch_3d_std_x_notnull,
                    user_agg_sens.acc_watch_3d_std_y_notnull,
                    user_agg_sens.acc_watch_3d_std_z_notnull,
                    user_agg_sens.acc_watch_3d_ro_xy_notnull,
                    user_agg_sens.acc_watch_3d_ro_xz_notnull,
                    user_agg_sens.acc_watch_3d_ro_yz_notnull,
                    user_agg_sens.loc_valid_updates_notnull,
                    user_agg_sens.loc_log_latitude_range_notnull,
                    user_agg_sens.loc_log_longitude_range_notnull,
                    user_agg_sens.loc_min_altitude_notnull,
                    user_agg_sens.loc_max_altitude_notnull,
                    user_agg_sens.loc_best_horizontal_accuracy_notnull,
                    user_agg_sens.loc_best_vertical_accuracy_notnull,
                    user_agg_sens.loc_diameter_notnull,
                    user_agg_sens.loc_log_diameter_notnull,
                    user_agg_sens.loc_features_std_lat_notnull,
                    user_agg_sens.loc_features_std_long_notnull,
                    user_agg_sens.loc_features_lat_change_notnull,
                    user_agg_sens.loc_features_log_change_notnull,
                    user_agg_sens.loc_features_mean_abs_lat_deriv_notnull,
                    user_agg_sens.loc_features_mean_abs_long_deriv_notnull
) 
    
user_agg.show(n=1, vertical=True)

-RECORD 0---------------------------------------------------------
 user_id                                   | 136562B6-95B2-483... 
 number_examples                           | 6218                 
 lab_FIX_walking                           | 765                  
 lab_FIX_running                           | 0                    
 lab_BICYCLING                             | 0                    
 lab_IN_A_CAR                              | 165                  
 lab_ON_A_BUS                              | 0                    
 lab_DRIVE_I_M_THE_DRIVER                  | 0                    
 lab_DRIVE_I_M_A_PASSENGER                 | 40                   
 lab_STROLLING                             | 0                    
 lab_STAIRS_GOING_UP                       | 30                   
 lab_STAIRS_GOING_DOWN                     | 30                   
 lab_new_geographical_displac              | 960                  
 lab_new_geographical_locat                | 4353             

# Persist outputs in Postgres DB

In [32]:
# Persist final dataframe into a table in the DB
user_agg_lab \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/ha_db") \
    .option("dbtable", "human_activity.user_agg_lab") \
    .option("user", "ha_user") \
    .option("password", "hnm4/4c71v1tY") \
    .option("driver", "org.postgresql.Driver") \
    .mode('overwrite') \
    .save()

In [33]:
# Persist final dataframe into a table in the DB
user_agg_sens \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/ha_db") \
    .option("dbtable", "human_activity.user_agg_sens") \
    .option("user", "ha_user") \
    .option("password", "hnm4/4c71v1tY") \
    .option("driver", "org.postgresql.Driver") \
    .mode('overwrite') \
    .save()

In [44]:
# Persist final dataframe into a table in the DB
user_agg \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/ha_db") \
    .option("dbtable", "human_activity.user_agg") \
    .option("user", "ha_user") \
    .option("password", "hnm4/4c71v1tY") \
    .option("driver", "org.postgresql.Driver") \
    .mode('overwrite') \
    .save()

In [None]:
spark.stop()