#### Charger la donnée bateaux.parq dans une DataFrame

In [2]:
from datetime import datetime 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.parquet('/FileStore/tables/bateaux.parq')
df.registerTempTable("bateaux")

#### Afficher le nombre d'éléments du DF

In [4]:
df.count()

#### Afficher le type des éléments du DF

In [6]:
df.dtypes

#### Afficher le schéma du DF

In [8]:
df.printSchema()

In [9]:
df.show()

#### Ordonnez les bateaux par vitesse

In [11]:
df.sort('speed').show()

#### Pour disposer d'une date plus lisible, rajoutez la colonne "timestamp_readable"
#### Pour cela, il vous faudra d'abord transformer le timestamp en secondes (timestamp_pos est en ms) et ensuite convertir en format date

In [13]:
import datetime
#timestamp_seconds = datetime.datetime.fromtimestamp(df.timestamp_pos).isoformat()
#print(timestamp_seconds)
newdf = df.withColumn("timestamp_readable", df.timestamp_pos)
newdf.show()

#### Il est possible de créer un nouveau dataframe contenant des statistiques sur un dataframe

In [15]:
statsdf = df.describe()
statsdf.show()

#### Filtrer le dataframe bateaux sur un bateau en particulier

In [17]:
import pyspark.sql.functions as f
filterdf = df.where((f.col("id_vessel") == '124436'))
filterdf.show()

#### Le nombre de bateaux qui ont été à l'arret et qui se sont déplacés

In [19]:
speed = df['speed']==0
dfd = df.where(speed)
#dfd.show()
##raisonnement bidon
df.count(f.col('id_vessel'))

#### Les types distincts de bateaux

In [21]:
df.select('type').distinct().sort('type').show(300)

#### On affiche la moyenne de la vitesse des bateaux sans tenir compte des bateaux a l'arret

In [23]:
from pyspark.sql.functions import when, avg

speedAvg = df.select(avg(when(df['speed'] > 0, df['speed'])))
speedAvg.show()

#### On affiche la date min puis  la date max pour chaque bateau

In [25]:
test = sqlContext.sql("SELECT id_vessel, MIN(timestamp_pos), MAX(timestamp_pos) FROM bateaux GROUP BY id_vessel")
test.show()

#### Créer un dataframe avec l'id, la vitesse et le timestamp readable

In [27]:
data = newdf.select("id_vessel","speed","timestamp_readable")
data.show()

#### Créer une colonne mois pour le mois en cours

In [29]:
from datetime import datetime
from pyspark.sql.functions import lit
data = data.withColumn('month', lit(datetime.now().month))
data.show()

#### Ajouter une colonne boolean à True si le bateau est en mouvement

In [31]:
from pyspark.sql.functions import when

data = data.withColumn("inMovement", when(data['speed'] > 0, True).otherwise(False))
data.show()

#### Enregistrer la table

In [33]:
data.write.csv('data2.csv')
#df.write.format('com.databricks.spark.csv').save('mycsv.csv')

#### Afficher par type, le nombre de bateaux, la vitesse max et la moyenne de la vitesse, ordonnés par nombre de bateaux (requete SQL)

In [35]:
test = sqlContext.sql("SELECT COUNT(id_vessel) as nbBoat, AVG(speed), MAX(speed) FROM bateaux GROUP BY type ORDER BY nbBoat DESC")
test.show(100)

##### Mettons nos compétences au service d'un cas d'usage un peu plus intéressant.
##### Nous allons prendre un bateau de notre choix et nous allons tenter de calculer quels sont les bateaux qu'il a pu croiser sur sa route

#### Créer un nouveau dataframe basé sur le DF bateaux et filtré uniquement sur id_vessel de votre choix.
#### Débarassez vous également de la colonne "payload" avec la méthode df.drop("colonne")

In [38]:
dfBateaux = df.where((f.col("id_vessel") == '124436'))
dfBateaux = dfBateaux.drop('payload')
dfBateaux.show()

#### Créer un deuxième dataframe basé sur le DF bateaux, avec les colonnes id_vessel, lat, lon et timestamp_pos
#### On transforme nos DF en rdd grâce à la fonction rdd

In [40]:
dfBoat = df.select('id_vessel', 'lat', 'lon', 'timestamp_pos')
rddBoat = dfBoat.rdd
rddMyBoat = dfBateaux.rdd
#rddBoat.collect()
#rddMyBoat.collect()

#### On calcule le nombre d'enregistrements associés à notre bateau
 
#### On calcule le nombre d'enregistrements du RDD de référence

In [42]:
myBoatRec = rddMyBoat.count()
referenceRec = rddBoat.count()
myBoatRec
referenceRec

##### Créer un RDD batEtude produit cartesien de notre RDD bateau et du RDD référentiel

In [44]:
batEtude = rddMyBoat.cartesian(rddBoat)
batEtude

##### On affiche la première ligne pour analyser son contenu
##### Astuce : n'oubliez pas que l'on est maintenant sur des RDD

In [46]:
batEtude.take(100)

##### On pourrait s'amuser à compter le nombre de lignes totales du RDD.. mais c'est une action !
##### Il est possible que cela prenne un peu de temps selon la taille de votre RDD contenant un seul id_vessel

In [48]:
batEtude.count()

##### B4 - Filtrer notre produit cartésien pour éliminer les bateaux identiques à notre id_vessel choisi
##### Astuce : notre RDD batEtude est composé de tuples(bateau,bateauRef)

In [50]:
batEtude.groupBy(lambda x:(x[124436],x[124436])).map(lambda x:(x[124436][124436], x[124436][124436])).collect()
##NOPE

##### On va également filtrer les informations des bateaux pour ne conserver que les enregistrements à +/- une heure (3600s)

In [53]:
def calculateDistanceInKilometer(userlat, userlon, otherlat, otherlon):
    AVERAGE_RADIUS_OF_EARTH_KM = 6371
    latDistance = math.radians(userlat - otherlat)
    lngDistance = math.radians(userlon - otherlon)
    sinLat = math.sin(latDistance / 2)
    sinLng = math.sin(lngDistance / 2)
    a = sinLat * sinLat + (math.cos(math.radians(userlat)) * math.cos(math.radians(otherlat))* sinLng * sinLng)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return int(AVERAGE_RADIUS_OF_EARTH_KM * c)

##### Filtrer notre RDD en ne conservant que les bateaux pour lesquels la distance est inférieure à 1 km
##### La méthode calculateDistanceInKilometer(lat,lon,lat,long) renvoi la distance en kilomètres entre deux coordonnées