In [None]:
import random
import re
import json
from itertools import tee
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import pymongo

##### Initialisation de MongoDB & Spark

In [None]:
client = pymongo.MongoClient("mongodb://127.0.0.1:27017/")
spark = SparkSession.builder.getOrCreate()
client.drop_database('AviFest')

##### Création de la base de donnée

In [None]:
db = client['AviFest']

Lecture du fichier persons.json qui contient une liste de personne générée aléatoirement dans le script gen_persons.py.  
Traitement et nettoyage des données.

In [None]:
df = spark.read.json('persons.json').toPandas()

def preprocessing(row):
    row['sexe'] = row['sexe'][0]
    firstname, lastname, *_ = row['name'].split(' ')
    row['firstname'] = firstname
    row['lastname'] = lastname
    
    del row['name']
    
    return row

df = df.apply(preprocessing, axis=1)

Insertion dans la collection "users"

In [None]:
users = db['users']

for index, row in df.iterrows():
    users.insert_one({
        'phone': row['phone'],
        'email': row['email'],
        'sexe': row['sexe'],
        'age': row['age'],
        'firstname': row['firstname'],
        'lastname': row['lastname'],
    })

In [None]:
for document in users.find({}):
    print(document)

Lecture du fichier show.csv  
Traitement et nettoyage de donnée

In [None]:
df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("show.csv")
df = df.drop(*filter(lambda x: '_c' in x, df.columns)).toPandas()

Insertion dans la collection "shows"

In [None]:
shows = db['shows']

for index, row in df.iterrows():
    shows.insert_one({
        'title': row['title'],
        'type': row['type'],
        'artist': row['artist'],
        'price': '',
        'nbPlace': row['nb_place'],
        'idLocation': '',
    })

In [None]:
for document in shows.find({}):
    print(document)

Lecture du fichier parkings.json  
Traitement et nettoyage

In [None]:
parkings = db['parkings']

rows, get_nb_rows = tee(spark.read.option("multiline", "true").json('parkings.json').toPandas().iterrows())
nb_parkings = len(list((get_nb_rows)))

# Liste de booleans aléatoires avec 25% de False
list_pmr = list(map(lambda x: x < 0.75, [random.random() for _ in range(nb_parkings)]))

description: str
# Boucle sur 3 list différentes, les parkings et 2 listes aléatoires pour générer des booleans
# Avec Spark, je peux directement unpack le json, c'est bizarre mais ca marche
for (_, (((long, lat), _), (description, name), _)), pmr, bus in zip(rows, list_pmr, reversed(list_pmr)):
    
    try:
        slot = int(re.search(r'\*\*(\d+)\*\*', description).group(1))
        
        if not slot:
            slot = 'unknown'
    except: slot = 'unknown'
    
    parkings.insert_one({
        'type': 'voiture',
        'name': name,
        'nbslots': slot,
        'paying': not 'gratuit' in description.lower(),
        'busFestiv': bus,
        'pmr': pmr,
        'lat': lat,
        'long': long,
    })

In [None]:
for document in parkings.find({}):
    print(document)