In [1]:
from pyspark.sql.functions import col, when, to_timestamp, unix_timestamp
from pyspark.sql.types import DoubleType

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext.getOrCreate()
# spark = SparkSession(sc)

In [2]:
from pyspark.sql.types import (StructField,StructType, BooleanType, DoubleType,LongType, IntegerType)
from pyspark.sql.types import *

In [3]:
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

In [4]:
# Définition du type : SPOINT

spoint_schema = [StructField('lat', FloatType(),True),
                 StructField('lon', FloatType(),True)]
spoint = StructType(fields=spoint_schema)

In [5]:
# Définition du type : SECTION

section_schema = [StructField('lat1', FloatType(),True),
                  StructField('lon1', FloatType(),True),
                  StructField('lat2', FloatType(),True),
                  StructField('lon2', FloatType(),True)]
section = StructType(fields=section_schema)

In [6]:
# Définition du type : SLINE

sline_schema = [StructField('rints', ArrayType(section),True)]
sline = StructType(fields=sline_schema)

In [7]:
# Définition du type : USPOINT

uspoint_schema = [StructField('t1', LongType(),True),
                  StructField('t2', LongType(),True),
                  StructField('lat1', FloatType(),True),
                  StructField('lon1', FloatType(),True),
                  StructField('lat2', FloatType(),True),
                  StructField('lon2', FloatType(),True)]
uspoint = StructType(fields=uspoint_schema)

In [8]:
# Définition du type : MSPOINT

mspoint_schema = [StructField('rints', ArrayType(uspoint),True)]
mspoint = StructType(fields=mspoint_schema)

In [9]:
# Définition du type : UINT

uint_schema = [StructField('val', IntegerType(),True),
               StructField('t1', LongType(),True),
               StructField('t2', LongType(),True)]
uint = StructType(fields=uint_schema)

In [10]:
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

In [11]:
# Définition du type : MINT

mint_schema = [StructField('units', ArrayType(uint),True)]
mint = StructType(fields=mint_schema)

In [12]:
# Définition du type : USTRING

ustring_schema = [StructField('val', StringType(),True),
               StructField('t1', LongType(),True),
               StructField('t2', LongType(),True)]
ustring = StructType(fields=ustring_schema)

In [13]:
# Définition du type : MSTRING

mstring_schema = [StructField('units', ArrayType(ustring),True)]
mstring = StructType(fields=mstring_schema)

In [14]:
# Définition du type : UREAL

ureal_schema = [StructField('a', FloatType(),True),
              StructField('b', FloatType(),True),
              StructField('c', FloatType(),True),
              StructField('r', BooleanType(),True),
              StructField('t1', LongType(),True),
              StructField('t2', LongType(),True)]
ureal = StructType(fields=ureal_schema)

In [15]:
# Définition du type : MREAL

mreal_schema = [StructField('units', ArrayType(ureal),True)]
mreal = StructType(fields=mreal_schema)

In [16]:
# Définition du type : USINT

usint_schema = [StructField('val', IntegerType(),True),
              StructField('interval', section,True)]
usint = StructType(fields=usint_schema)

In [17]:
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

In [18]:
# Définition du type : MSINT

msint_schema = [StructField('units', ArrayType(usint),False)]
msint = StructType(fields=msint_schema)

In [19]:
# Définition du type : USSTRING

usstring_schema = [StructField('val', StringType(),True),
              StructField('interval', section,True)]
usstring = StructType(fields=usstring_schema)

In [20]:
# Définition du type : MSSTRING

msstring_schema = [StructField('units', ArrayType(usstring),False)]
msstring = StructType(fields=msstring_schema)

In [21]:
# Définition du type : USREAL

usreal_schema = [StructField('a', FloatType(),True),
                 StructField('b', FloatType(),True),
                 StructField('c', FloatType(),True),
                 StructField('r', BooleanType(),True),
                 StructField('interval', section,True)]
usreal = StructType(fields=usreal_schema)

In [22]:
# Définition du type : MSREAL

msreal_schema = [StructField('units', ArrayType(usreal),False)]
msreal = StructType(fields=msreal_schema)

In [23]:
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

In [24]:
# Définition du type : INTIME

intime_schema = [StructField('val', FloatType(),True),
                 StructField('t1', LongType(),True)]
intime = StructType(fields=intime_schema)

In [25]:
# Définition du type : INSPOINT

inspoint_schema = [StructField('val', FloatType(),True),
                   StructField('sp', spoint,True)]
inspoint = StructType(fields=inspoint_schema)

In [26]:
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

In [27]:
# Lecture Data
df = spark.read.option("header",True).option("inferSchema",True).csv("VGP-week3-data.csv")
df.printSchema()

root
 |-- kit_id: integer (nullable = true)
 |-- participant_id: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- PM2-5: string (nullable = true)
 |-- PM10: string (nullable = true)
 |-- PM1-0: string (nullable = true)
 |-- NO2: string (nullable = true)
 |-- BC: string (nullable = true)
 |-- activity: string (nullable = true)
 |-- event: string (nullable = true)



In [28]:
#On change les string "NULL" en null
df = df.withColumn(("BC"), when(col("BC") == "NULL", None).otherwise(col("BC")))
df = df.withColumn(("PM2-5"), when(col("PM2-5") == "NULL", None).otherwise(col("PM2-5")))
df = df.withColumn(("PM10"),  when(col("PM10") == "NULL",  None).otherwise(col("PM10")))
df = df.withColumn(("PM1-0"), when(col("PM1-0") == "NULL", None).otherwise(col("PM1-0")))
df = df.withColumn(("NO2"),   when(col("NO2") == "NULL",   None).otherwise(col("NO2")))
df = df.withColumn(("activity"), when(col("activity") == "NULL", None).otherwise(col("activity")))
df = df.withColumn(("event"), when(col("event") == "NULL", None).otherwise(col("event")))

In [29]:
df = df.withColumn('time',unix_timestamp('time', 'yyyy-MM-dd HH:mm:ss').alias('time'))
df.show()
df.printSchema()

+------+--------------+----------+----------------+----------------+-----+----+-----+----+----+--------+-----+
|kit_id|participant_id|      time|             lat|             lon|PM2-5|PM10|PM1-0| NO2|  BC|activity|event|
+------+--------------+----------+----------------+----------------+-----+----+-----+----+----+--------+-----+
|    80|       9999964|1573718400|48.7717766666667|        2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718410|48.7717766666667|        2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718420|48.7717766666667|        2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718430|48.7717766666667|        2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718440|48.7717766666667|        2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718450|48.7717766666667|        2.006005| null|null| null|null|null|    null| null|
|

In [30]:
#Conversion des types PM2.5, BC ... en double
#Remarque dans notre fichier de donnée j'ai eliminer le "+00" a chaque fois
df = df.withColumn("PM2-5",df["PM2-5"].cast(FloatType()))
df = df.withColumn("PM10",df["PM10"].cast(FloatType()))
df = df.withColumn("PM1-0",df["PM1-0"].cast(FloatType()))
df = df.withColumn("NO2",df["NO2"].cast(FloatType()))
df = df.withColumn("BC",df["BC"].cast(FloatType()))
df = df.withColumn("lat",df["lat"].cast(FloatType()))
df = df.withColumn("lon",df["lon"].cast(FloatType()))
df = df.withColumn("time",df["time"].cast(LongType()))
df.printSchema()

root
 |-- kit_id: integer (nullable = true)
 |-- participant_id: integer (nullable = true)
 |-- time: long (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- PM2-5: float (nullable = true)
 |-- PM10: float (nullable = true)
 |-- PM1-0: float (nullable = true)
 |-- NO2: float (nullable = true)
 |-- BC: float (nullable = true)
 |-- activity: string (nullable = true)
 |-- event: string (nullable = true)



In [31]:
def change_column_names(columns):
    return [c.replace('-', '_') for c in columns]

df = df.toDF(*change_column_names(df.columns))
df.show()
df.count()

+------+--------------+----------+---------+---------+-----+----+-----+----+----+--------+-----+
|kit_id|participant_id|      time|      lat|      lon|PM2_5|PM10|PM1_0| NO2|  BC|activity|event|
+------+--------------+----------+---------+---------+-----+----+-----+----+----+--------+-----+
|    80|       9999964|1573718400| 48.77178| 2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718410| 48.77178| 2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718420| 48.77178| 2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718430| 48.77178| 2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718440| 48.77178| 2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718450| 48.77178| 2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718460| 48.77178| 2.006005| null|null| null|null|null|    null| null|
|    80|       9999964|1573718

59972

In [43]:
liste_id = df.select('participant_id').distinct().rdd.map(lambda r: r[0]).collect()
colonnes_names = ['PM2_5','PM10','PM1_0','NO2','BC','activity','event']

In [44]:
# Affichage de tout les ID des participants 
print(liste_id)
print(colonnes_names)

[9999920, 9999955, 9999975, 9999936, 9999930, 9999960, 9999964, 9999962, 999992]
['PM2_5', 'PM10', 'PM1_0', 'NO2', 'BC', 'activity', 'event']


In [101]:
def get_MReal(id_particip, colonne):
    #On test si le participant et la colonne existent
    if ((id_particip in liste_id) and (colonne in colonnes_names)):
        # On enleve les espace
        colonne = colonne.strip()
        # On recupere le bon participant, la colonne sans les null et on tri par rapport au timestamp
        df_one_participant = df.where((df[colonne].isNotNull()) & (df['participant_id'] == id_particip)).sort("time")
        # A partir de la je reprend ma methode
    
        #Creation d'une nouvelle DF temp pour faire la jointure et regrouper les tuples (1 avec 2 , 3 avec 4, ect...)
        temp = df_one_participant
        #On renome les colonnes de temp pour qu'il y'ait pas d'ambiguité
        temp = temp.withColumnRenamed('kit_id','kit_id_temp')
        temp = temp.withColumnRenamed('participant_id','participant_id_temp')
        temp = temp.withColumnRenamed('time','time_temp')
        temp = temp.withColumnRenamed('lat','lat_temp')
        temp = temp.withColumnRenamed('lon','lon_temp')
        temp = temp.withColumnRenamed('PM2_5','PM2_5_temp')
        temp = temp.withColumnRenamed('PM10','PM10_temp')
        temp = temp.withColumnRenamed('PM1_0','PM1_0_temp')
        temp = temp.withColumnRenamed('NO2','NO2_temp')
        temp = temp.withColumnRenamed('BC','BC_temp')
        temp = temp.withColumnRenamed('activity','activity_temp')
        temp = temp.withColumnRenamed('event','event_temp')
        
        #On decremente le time de temp pour pouvoir faire la jointure avec == (car j'ai pas pu le faire avec == time_temp + 10)
        temp = temp.withColumn('time_temp', temp.time_temp - 10)
        #On fait la jointure
        #On fais le rename ici car y'avais un bug de spark 
        df_one_participant = df_one_participant.withColumnRenamed('time','time')
        x = df_one_participant.join(temp, [temp['time_temp'] == df_one_participant['time'] ])
        # On decremente le time_temp pour revenir a un etat cohérent
        y = x.select(x.lat,x.lon,x['time'],x['time_temp'] + 10, x.lat_temp,x.lon_temp, x['PM2_5'], x['PM2_5_temp'], x['PM10'], x['PM10_temp'], x['PM1_0'], x['PM1_0_temp'], x['NO2'], x['NO2_temp'], x['BC'], x['BC_temp'], x['activity'], x['activity_temp'], x['event'], x['event_temp'])
        
        #Le map des differents colonnes  
        if (colonne == "PM2_5"): 
            rdd_pm25 = y.rdd.map(lambda r :[[(0.0, ((r[7]-r[6])/(r[3]-r[2])), (((r[6]*r[3])-(r[7]*r[2]))/((r[3]-r[2]))),False,r[2],r[3])]])
            return spark.createDataFrame(rdd_pm25,mreal)
        elif (colonne == "PM10"): 
            rdd_pm10 = y.select(y.columns).rdd.map(lambda r : [[(0.0, ((r[9]-r[8])/(r[3]-r[2])), (((r[8]*r[3])-(r[9]*r[2]))/((r[3]-r[2]))),False,r[2],r[3])]])
            return spark.createDataFrame(rdd_pm10,mreal)
        elif (colonne == "PM1_0"): 
            rdd_pm1_0 = y.select(y.columns).rdd.map(lambda r :[[(0.0, ((r[11]-r[10])/(r[3]-r[2])), (((r[10]*r[3])-(r[11]*r[2]))/((r[3]-r[2]))), False, r[2], r[3])]])
            return spark.createDataFrame(rdd_pm1_0,mreal)
        elif (colonne == "NO2"): 
            rdd_no2 = y.select(y.columns).rdd.map(lambda r :  [[(0.0, ((r[13]-r[12])/(r[3]-r[2])), (((r[12]*r[3])-(r[13]*r[2]))/((r[3]-r[2]))), False, r[2], r[3])]])
            return spark.createDataFrame(rdd_no2,mreal)
        elif (colonne == "BC"): 
            rdd_bc = y.select(y.columns).rdd.map(lambda r :   [[(0.0, ((r[15]-r[14])/(r[3]-r[2])), (((r[14]*r[3])-(r[15]*r[2]))/((r[3]-r[2]))), False, r[2], r[3])]])
            return spark.createDataFrame(rdd_bc,mreal)
        elif (colonne == "activity"): 
            rdd_activity = y.select(y.columns).rdd.map(lambda r : [[(r[16], r[2], r[3])]])
            return spark.createDataFrame(rdd_activity, mstring)
        elif (colonne == "event"): 
            rdd_event = y.select(y.columns).rdd.map(lambda r : [[(r[18], r[2], r[3])]])
            return spark.createDataFrame(rdd_event, mstring)
    else : 
        #print("Ce participant n'existe pas || ou la colonne n'existe pas")
        raise ValueError("Ce participant n'existe pas || ou la colonne n'existe pas")

    
a = get_MReal(9999920,"PM1_0")
a.show()

+--------------------+
|               units|
+--------------------+
|[[0.0, 0.0, 19.0,...|
|[[0.0, -0.3, 4.72...|
|[[0.0, 0.0, 16.0,...|
|[[0.0, 0.0, 16.0,...|
|[[0.0, 0.0, 16.0,...|
|[[0.0, 0.0, 16.0,...|
|[[0.0, 0.0, 16.0,...|
|[[0.0, -0.1, 1.57...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, 0.0, 15.0,...|
|[[0.0, -0.1, 1.57...|
+--------------------+
only showing top 20 rows

