La cellule ci après permet d'importer la bibliotheque qui initialise spark en local

In [1]:
import findspark
findspark.init('/opt/spark-3.0.1-bin-hadoop3.2')

La cellule ci apres permet d'importer les bibliotheques pyspark pour la connexion entre spark et mongodb

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession

In [3]:
input_uri="mongodb://127.0.0.1/pharma_db.medecine"
output_uri="mongodb://127.0.0.1/pharma_db.medecine"

myspark = SparkSession \
    .builder \
    .appName("myGIS") \
    .config("spark.mongodb.input.uri", input_uri) \
    .config("spark.mongodb.output.uri", output_uri) \
    .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:2.4.2") \
    .getOrCreate()

La cellule ci dessous permet d'extraire des donnees de la collection medical_products de la base GIS

In [4]:
df = myspark.read.format("com.mongodb.spark.sql.DefaultSource").load()   

print(df.printSchema()) 

df.show()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- cluster_ref: string (nullable = true)
 |-- drugs: string (nullable = true)
 |-- pharmacy: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- region_name: string (nullable = true)
 |-- site_ref: string (nullable = true)
 |-- specif: string (nullable = true)

None
+--------------------+-----------+-------------+--------+--------+-----------+--------+--------+
|                 _id|cluster_ref|        drugs|pharmacy|quantity|region_name|site_ref|  specif|
+--------------------+-----------+-------------+--------+--------+-----------+--------+--------+
|[602728d04079925f...|      (0,0)|  Amocexiline|  Soudia|    40.0|Belle-ville|   (0,1)|400mg/ml|
|[602728d04079925f...|      (0,0)|    Diazopame|  Soudia|    40.0|Belle-ville|   (0,1)|  5mg/ml|
|[602728d04079925f...|      (0,0)|    Etomidate|  Soudia|    40.0|Belle-ville|   (0,1)|  2mg/ml|
|[602728d04079925f...|      (0,0)|  Paracetamol|  S

La cellule ci dessous permet d'inserer des donnees de la collection "medecines" de la base GIS

In [5]:
schema = StructType([
    StructField("region_name", StringType()),
    StructField("cluster_ref",  StringType()),
    StructField("site_ref", StringType()),
    StructField("pharmacy",  StringType()),
    StructField("drugs",  StringType()),
    StructField("specif",  StringType()),
    StructField("quantity",  IntegerType())
])

df = myspark.createDataFrame([('Lafiabougou', '(20,20)', '(60,60)','Harmony','Artefan','500mg/ml',15),('Lafiabougou', '(20,20)', '(60,60)','Harmony','Lumatherme','500mg/ml',20) ], schema)

df.show()

df.write.format('com.mongodb.spark.sql.DefaultSource').mode("append").save()

+-----------+-----------+--------+--------+----------+--------+--------+
|region_name|cluster_ref|site_ref|pharmacy|     drugs|  specif|quantity|
+-----------+-----------+--------+--------+----------+--------+--------+
|Lafiabougou|    (20,20)| (60,60)| Harmony|   Artefan|500mg/ml|      15|
|Lafiabougou|    (20,20)| (60,60)| Harmony|Lumatherme|500mg/ml|      20|
+-----------+-----------+--------+--------+----------+--------+--------+



In [6]:
import pymongo # Interface from Python <---> MongoDB
import pandas as pd
import threading
import time
import os
from pymongo import MongoClient
from bson.code import Code # Help MongoDB with map-reduce

client = MongoClient('mongodb://localhost:27017/')

db = client["gis"]

collection = db["medecines"]

produits = collection.find({})

# print(produits.next())

start_time = time.time()

def task1():
    # Map Function:
    start_t1 = time.time()
    mapper = Code("function(){ var skill =this.drugs ='Morphine';\
               if(this.quantity>0){\
                   for(i in skill){emit({Information:{Pharmacy:this.pharmacy,Produits:this.drugs,\
                   Specification:this.specif,Reference:this.cluster_ref}},\
                   1);}\
                 }\
               }")
    # Reduce:
    
    reducer = Code("function(key,values){return Array.sum(values);}")

    # Bringing it all together, creating an output file: 'ppl_skillCount'
    map_red = collection.map_reduce(mapper,reducer,'ppl_skillCount')

    end_t1 = time.time()

    print("Le temps d'execution du task1{}", end_t1 -start_t1)


def task2():
    # Map Function:
    start_t2 = time.time() 
    mapper1 = Code("function(){ var skill =this.drugs ='Naloxone';\
               if(this.quantity>0){\
                   for(i in skill){emit({Information:{Pharmacy:this.pharmacy,Produits:this.drugs,Specification:this.specif,Reference:this.cluster_ref}},\
                   1);}\
                 }\
               }")
    reducer = Code("function(key,values){return Array.sum(values);}")

    # Bringing it all together, creating an output file: 'ppl_skillCount'
    map_reduce1 = collection.map_reduce(mapper1,reducer,'ppl_skillCount2')
    
    end_t2 = time.time() 
    
    print("Le temps d'execution du task2{}", end_t2 -start_t2)

end_time = time.time()

print("Le temps d'execution du Global Parallele{}", end_time -start_time)

if __name__ == "__main__":
 
    # print ID of current process
    print("ID of process running main program: {}".format(os.getpid()))
 
    # print name of main thread
    print("Main thread name: {}".format(threading.main_thread().name))
   
    # creating threads
   
    t1 = threading.Thread(target=task1, name='t1')
    t2 = threading.Thread(target=task2, name='t2')  

    # starting threads
    print("starting threads Now !")
    t1.start()   
    t2.start()    
    # wait until all threads finish
    print("Wait until all threads finish")
    t1.join()    
    t2.join()   

Le temps d'execution du Global Parallele{} 0.00043010711669921875
ID of process running main program: 6180
Main thread name: MainThread
starting threads Now !
Wait until all threads finish
Le temps d'execution du task2{} 0.5231492519378662
Le temps d'execution du task1{} 0.6580052375793457


In [7]:
# Printing it all out as dictionary format stored in a list:
mapper = Code("function(){ var skill =this.drugs ='Morphine';\
           if(this.quantity>0){\
               for(i in skill){emit({Information:{Pharmacy:this.pharmacy,Produits:this.drugs,\
               Specification:this.specif,Reference:this.cluster_ref}},\
               1);}\
             }\
           }")
# Reduce:

reducer = Code("function(key,values){return Array.sum(values);}")

# Bringing it all together, creating an output file: 'ppl_skillCount'
map_red = collection.map_reduce(mapper,reducer,'ppl_skillCount')

ppl_skills=[]
for skills in map_red.find():
    ppl_skills.append(skills)
ppl_skills[:5]


[{'_id': {'Information': {'Pharmacy': 'Eureka',
    'Produits': 'Morphine',
    'Specification': '500mg/ml',
    'Reference': '(0,3)'}},
  'value': 330.0},
 {'_id': {'Information': {'Pharmacy': 'Hayatt',
    'Produits': 'Morphine',
    'Specification': '75mg/ml',
    'Reference': '(0,2)'}},
  'value': 44.0},
 {'_id': {'Information': {'Pharmacy': 'Wobi',
    'Produits': 'Morphine',
    'Specification': '50mg/ml',
    'Reference': '(2,5)'}},
  'value': 1100.0},
 {'_id': {'Information': {'Pharmacy': 'Soudia',
    'Produits': 'Morphine',
    'Specification': '5mg/ml',
    'Reference': '(0,0)'}},
  'value': 11.0},
 {'_id': {'Information': {'Pharmacy': 'Eureka',
    'Produits': 'Morphine',
    'Specification': '5mg/ml',
    'Reference': '(0,3)'}},
  'value': 55.0}]