In [327]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import (StructField,StructType, BooleanType, DoubleType,LongType)
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, to_timestamp, unix_timestamp,udf
from pyspark.sql.types import DoubleType, IntegerType, ArrayType, StringType, FloatType, DataType, TimestampType, Row
import inspect

In [128]:
rdd_data = sc.textFile("./VGP-week3-data.csv")

#inspect.getmembers(rdd_data, predicate=inspect.ismethod(rdd_data))

In [130]:
data_schema = rdd_data.first()

def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr

rdd_data = rdd_data.mapPartitionsWithIndex(remove_header)
#rdd_data = rdd_data.filter(lambda line : line != data_schema)

data_schema [5] = "PM25"

data_schema [7] = "PM100"

data_schema

['kit_id',
 'participant_id',
 'time',
 'lat',
 'lon',
 'PM25',
 'PM10',
 'PM100',
 'NO2',
 'BC',
 'activity',
 'event']

In [131]:
rdd_data.first()

['80',
 '9999964',
 '2019-11-14 09:00:00+00',
 '48.7717766666667',
 '2.006005',
 'NULL',
 'NULL',
 'NULL',
 'NULL',
 'NULL',
 'NULL',
 'NULL']

In [132]:
# convert to float 
def to_float(value):
    if value == "NULL":
        return None
    elif value == "Null" :
        return None
    else:
        return float(value)
    
rdd_data = rdd_data.map(lambda l : [int(l[0]),int(l[1]),l[2],to_float(l[3]),to_float(l[4]),to_float(l[5]),to_float(l[6]),to_float(l[7]),to_float(l[8]),to_float(l[9]),l[10],l[11]])
rdd_data.first()

[80,
 9999964,
 '2019-11-14 09:00:00+00',
 48.7717766666667,
 2.006005,
 None,
 None,
 None,
 None,
 None,
 'NULL',
 'NULL']

In [133]:
schema = StructType([StructField(data_schema[0], IntegerType(), True),
                     StructField(data_schema[1], IntegerType(), True),
                     StructField(data_schema[2], StringType(), True),
                     StructField(data_schema[3], FloatType(), True),
                     StructField(data_schema[4], FloatType(), True),
                     StructField(data_schema[5], FloatType(), True),
                     StructField(data_schema[6], FloatType(), True),
                     StructField(data_schema[7], FloatType(), True),
                     StructField(data_schema[8], FloatType(), True),
                     StructField(data_schema[9], FloatType(), True),
                     StructField(data_schema[10], StringType(), True),
                     StructField(data_schema[11], StringType(), True)])             
                     
df_data=rdd_data.toDF(schema)


In [134]:
df_data.printSchema()

root
 |-- kit_id: integer (nullable = true)
 |-- participant_id: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- PM25: float (nullable = true)
 |-- PM10: float (nullable = true)
 |-- PM100: float (nullable = true)
 |-- NO2: float (nullable = true)
 |-- BC: float (nullable = true)
 |-- activity: string (nullable = true)
 |-- event: string (nullable = true)



In [135]:
df_data.show(5)

+------+--------------+--------------------+--------+--------+----+----+-----+----+----+--------+-----+
|kit_id|participant_id|                time|     lat|     lon|PM25|PM10|PM100| NO2|  BC|activity|event|
+------+--------------+--------------------+--------+--------+----+----+-----+----+----+--------+-----+
|    80|       9999964|2019-11-14 09:00:...|48.77178|2.006005|null|null| null|null|null|    NULL| NULL|
|    80|       9999964|2019-11-14 09:00:...|48.77178|2.006005|null|null| null|null|null|    NULL| NULL|
|    80|       9999964|2019-11-14 09:00:...|48.77178|2.006005|null|null| null|null|null|    NULL| NULL|
|    80|       9999964|2019-11-14 09:00:...|48.77178|2.006005|null|null| null|null|null|    NULL| NULL|
|    80|       9999964|2019-11-14 09:00:...|48.77178|2.006005|null|null| null|null|null|    NULL| NULL|
+------+--------------+--------------------+--------+--------+----+----+-----+----+----+--------+-----+
only showing top 5 rows



In [136]:
df_data = df_data.withColumn(data_schema[2], to_timestamp(data_schema[2], "yyyy-MM-dd hh:mm:ss"))

In [137]:
df_data.printSchema()

root
 |-- kit_id: integer (nullable = true)
 |-- participant_id: integer (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- PM25: float (nullable = true)
 |-- PM10: float (nullable = true)
 |-- PM100: float (nullable = true)
 |-- NO2: float (nullable = true)
 |-- BC: float (nullable = true)
 |-- activity: string (nullable = true)
 |-- event: string (nullable = true)



In [173]:
df_data.filter(df_data["participant_id"] == 9999920).filter(df_data["PM100"].isNotNull()).count()

1505

In [279]:
Ureal_format = [StructField('a', IntegerType(),True),
              StructField('b', DoubleType(),True),
              StructField('c', DoubleType(),True),
              StructField('r', BooleanType(),True),
              StructField('t1', TimestampType(),True),
              StructField('t2', TimestampType(),True)]
UrealType = StructType(Ureal_format)

In [140]:
Position_format = [StructField('lat', DoubleType(),True),
              StructField('lon', DoubleType(),True)]
PositionType = StructType(Position_format)

In [268]:
USreal_format = [StructField('a', IntegerType(),True),
              StructField('b', DoubleType(),True),
              StructField('c', DoubleType(),True),
              StructField('r', BooleanType(),True),
              StructField('p1', PositionType,True),
              StructField('p2', PositionType,True)]
USrealType = StructType(USreal_format)

In [280]:
Mreal_format = [StructField('time_units', ArrayType(UrealType),True)]
MrealType = StructType(Mreal_format)

In [143]:
MSreal_format = [StructField('space_units', ArrayType(USrealType),True)]
MSrealType = StructType(MSreal_format)

In [272]:
PM100_temp = spark.createDataFrame(sc.emptyRDD(), StructType([StructField("id_participant", IntegerType(), False), StructField("PM100_profil_temp", MrealType, True)]))

In [273]:
PM100_temp.printSchema()

root
 |-- id_participant: integer (nullable = false)
 |-- PM100_profil_temp: struct (nullable = true)
 |    |-- time_units: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- a: double (nullable = true)
 |    |    |    |-- b: double (nullable = true)
 |    |    |    |-- c: double (nullable = true)
 |    |    |    |-- r: boolean (nullable = true)
 |    |    |    |-- t1: long (nullable = true)
 |    |    |    |-- t2: long (nullable = true)



In [402]:
PM100_test = df_data.filter(df_data["PM100"].isNotNull()).filter(df_data["time"].isNotNull()).select("participant_id","time","PM100")

In [403]:
PM100_test.show(5)

+--------------+-------------------+-----+
|participant_id|               time|PM100|
+--------------+-------------------+-----+
|       9999964|2019-11-15 09:03:00|  4.0|
|       9999964|2019-11-15 09:03:10|  4.0|
|       9999964|2019-11-15 09:03:20|  4.0|
|       9999964|2019-11-15 09:03:30|  4.0|
|       9999964|2019-11-15 09:03:40|  4.0|
+--------------+-------------------+-----+
only showing top 5 rows



In [404]:
PM100_test = PM100_test.groupBy("participant_id").agg(F.collect_list(col('time')).alias('time'),F.collect_list(col('PM100')).alias('PM100'))

In [None]:
PM100_test = PM100_test.groupBy("participant_id").agg(F.collect_list(col('time')).alias('time'),F.collect_list(col('PM100')).alias('PM100'))

In [161]:
PM100_test.head()

Row(participant_id=9999920, time=[datetime.datetime(2019, 11, 15, 10, 36, 40), datetime.datetime(2019, 11, 15, 10, 36, 50), datetime.datetime(2019, 11, 15, 10, 37), datetime.datetime(2019, 11, 15, 10, 37, 10), datetime.datetime(2019, 11, 15, 10, 37, 20), datetime.datetime(2019, 11, 15, 10, 37, 30), datetime.datetime(2019, 11, 15, 10, 37, 40), datetime.datetime(2019, 11, 15, 10, 37, 50), datetime.datetime(2019, 11, 15, 10, 38), datetime.datetime(2019, 11, 15, 10, 38, 10), datetime.datetime(2019, 11, 15, 10, 38, 20), datetime.datetime(2019, 11, 15, 10, 38, 30), datetime.datetime(2019, 11, 15, 10, 38, 40), datetime.datetime(2019, 11, 15, 10, 38, 50), datetime.datetime(2019, 11, 15, 10, 39), datetime.datetime(2019, 11, 15, 10, 39, 10), datetime.datetime(2019, 11, 15, 10, 39, 20), datetime.datetime(2019, 11, 15, 10, 39, 30), datetime.datetime(2019, 11, 15, 10, 39, 40), datetime.datetime(2019, 11, 15, 10, 39, 50), datetime.datetime(2019, 11, 15, 10, 40), datetime.datetime(2019, 11, 15, 10, 4

In [418]:
def interpolate(time_serie_row, measures_row):
    ureal_list=[]
    for i in range(min(len(measures_row),len(time_serie_row))-1):
            a=0
            if (time_serie_row[i]-time_serie_row[i+1]).seconds == 0:
                i=i+1
            else:
                b=(measures_row[i]-measures_row[i+1])/(time_serie_row[i]-time_serie_row[i+1]).seconds
                c=measures_row[i]-time_serie_row[i].second*(measures_row[i]-measures_row[i+1])/(time_serie_row[i]-time_serie_row[i+1]).seconds
            r=False
            new_ureal = [a,b,c,r,time_serie_row[i],time_serie_row[i+1]]
            ureal_list.append(new_ureal)
    return Row(one_participant_profile = ureal_list)

# testing interpolate function 

list_time = list(PM100_test.select('time').head())[0]
list_measure = list(PM100_test.select('PM100').head())[0]
result = interpolate (list_time, list_measure)


def minimal_interval(row):
    diff_values=[]
    for i in range(len(row)-1):
        diff_values.append((row[i+1]-row[i]).seconds)
    return min(diff_values)

def traj_duration(row):
    return (max(row)-min(row)).seconds





In [378]:
# example for a row in the Dataframe
spark.createDataFrame(sc.parallelize([result]),MrealType).head()


Row(time_units=[Row(a=0, b=0.0, c=19.0, r=False, t1=datetime.datetime(2019, 11, 15, 10, 36, 40), t2=datetime.datetime(2019, 11, 15, 10, 36, 50)), Row(a=0, b=3.4726241463132305e-05, c=18.998263687926844, r=False, t1=datetime.datetime(2019, 11, 15, 10, 36, 50), t2=datetime.datetime(2019, 11, 15, 10, 37)), Row(a=0, b=0.0, c=16.0, r=False, t1=datetime.datetime(2019, 11, 15, 10, 37), t2=datetime.datetime(2019, 11, 15, 10, 37, 10)), Row(a=0, b=0.0, c=16.0, r=False, t1=datetime.datetime(2019, 11, 15, 10, 37, 10), t2=datetime.datetime(2019, 11, 15, 10, 37, 20)), Row(a=0, b=0.0, c=16.0, r=False, t1=datetime.datetime(2019, 11, 15, 10, 37, 20), t2=datetime.datetime(2019, 11, 15, 10, 37, 30)), Row(a=0, b=0.0, c=16.0, r=False, t1=datetime.datetime(2019, 11, 15, 10, 37, 30), t2=datetime.datetime(2019, 11, 15, 10, 37, 40)), Row(a=0, b=0.0, c=16.0, r=False, t1=datetime.datetime(2019, 11, 15, 10, 37, 40), t2=datetime.datetime(2019, 11, 15, 10, 37, 50)), Row(a=0, b=1.1575413821044103e-05, c=15.999421229

In [321]:
type(str(type(list_time))),type(result),MrealType

(str,
 pyspark.sql.types.Row,
 StructType(List(StructField(time_units,ArrayType(StructType(List(StructField(a,IntegerType,true),StructField(b,DoubleType,true),StructField(c,DoubleType,true),StructField(r,BooleanType,true),StructField(t1,TimestampType,true),StructField(t2,TimestampType,true))),true),true))))

In [304]:
# profile temporelle d'un utilisateur 
PM100_test.select("participant_id").head(),result

(Row(participant_id=9999920),
 Row(one_participant_profile=[[0, 0.0, 19.0, False, datetime.datetime(2019, 11, 15, 10, 36, 40), datetime.datetime(2019, 11, 15, 10, 36, 50)], [0, 3.4726241463132305e-05, 18.998263687926844, False, datetime.datetime(2019, 11, 15, 10, 36, 50), datetime.datetime(2019, 11, 15, 10, 37)], [0, 0.0, 16.0, False, datetime.datetime(2019, 11, 15, 10, 37), datetime.datetime(2019, 11, 15, 10, 37, 10)], [0, 0.0, 16.0, False, datetime.datetime(2019, 11, 15, 10, 37, 10), datetime.datetime(2019, 11, 15, 10, 37, 20)], [0, 0.0, 16.0, False, datetime.datetime(2019, 11, 15, 10, 37, 20), datetime.datetime(2019, 11, 15, 10, 37, 30)], [0, 0.0, 16.0, False, datetime.datetime(2019, 11, 15, 10, 37, 30), datetime.datetime(2019, 11, 15, 10, 37, 40)], [0, 0.0, 16.0, False, datetime.datetime(2019, 11, 15, 10, 37, 40), datetime.datetime(2019, 11, 15, 10, 37, 50)], [0, 1.1575413821044103e-05, 15.999421229308949, False, datetime.datetime(2019, 11, 15, 10, 37, 50), datetime.datetime(2019, 

In [406]:
udfInterpolate = udf(interpolate, MrealType)

udfMinimalInterval = udf(minimal_interval, IntegerType())

udfTrajDuration = udf(traj_duration, IntegerType())

In [407]:
PM100_test_profiles = PM100_test.withColumn('Ureal_data', udfInterpolate('time','PM100'))

In [408]:
PM100_test_minimal = PM100_test.withColumn('minimal', udfMinimalInterval('time'))

In [409]:
PM100_test_duration = PM100_test.withColumn('duration', udfTrajDuration('time'))

In [415]:
PM100_test_profiles.select('participant_id','Ureal_data').show()

+--------------+--------------------+
|participant_id|          Ureal_data|
+--------------+--------------------+
|       9999920|[[[0, 0.0, 19.0, ...|
|       9999955|[[[0, 0.0, 3.0, f...|
|       9999975|[[[0, 0.0, 25.0, ...|
|       9999960|[[[0, 2.315618849...|
|       9999964|[[[0, 0.0, 4.0, f...|
+--------------+--------------------+



In [416]:
PM100_test_minimal.select('participant_id','minimal').show()

+--------------+-------+
|participant_id|minimal|
+--------------+-------+
|       9999920|     10|
|       9999955|     10|
|       9999975|      0|
|       9999960|     30|
|       9999964|     10|
+--------------+-------+



In [417]:
PM100_test4.select('participant_id','duration').show()

+--------------+--------+
|participant_id|duration|
+--------------+--------+
|       9999920|   43130|
|       9999955|   43130|
|       9999975|   42390|
|       9999960|   41530|
|       9999964|   42790|
+--------------+--------+

