In [15]:
"""
30/07/2019
Luis Esteban

Data Simulation for Helpify PoC.
"""
# Import necessary modules
from pyspark.sql import SparkSession
# check if we have cloudant package 
!pip install cloudant
from cloudant.client import Cloudant
from cloudant.error import CloudantException
from cloudant.result import Result, ResultByKey
import datetime
import random
from numpy.random import normal
import time

Collecting cloudant
  Using cached https://files.pythonhosted.org/packages/d2/40/b2e2c47eb1ef6a1ff9519dbdf916356b97dd9d14d8c8b64f43b43fc56e1a/cloudant-2.12.0-py3-none-any.whl
Collecting requests<3.0.0,>=2.7.0 (from cloudant)
  Using cached https://files.pythonhosted.org/packages/51/bd/23c926cd341ea6b7dd0b2a00aba99ae0f828be89d72b2190f27c11d4b7fb/requests-2.22.0-py2.py3-none-any.whl
Collecting certifi>=2017.4.17 (from requests<3.0.0,>=2.7.0->cloudant)
  Using cached https://files.pythonhosted.org/packages/69/1b/b853c7a9d4f6a6d00749e94eb6f3a041e342a885b87340b79c1ef73e3a78/certifi-2019.6.16-py2.py3-none-any.whl
Collecting idna<2.9,>=2.5 (from requests<3.0.0,>=2.7.0->cloudant)
  Using cached https://files.pythonhosted.org/packages/14/2c/cd551d81dbe15200be1cf41cd03869a46fe7226e7450af7a6545bfc474c9/idna-2.8-py2.py3-none-any.whl
Collecting urllib3!=1.25.0,!=1.25.1,<1.26,>=1.21.1 (from requests<3.0.0,>=2.7.0->cloudant)
  Using cached https://files.pythonhosted.org/packages/e6/60/247f23a7121ae63

In [1]:
# Credentials to conect to Cloudant
credentials = {
}

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20190725205104-0000
KERNEL_ID = 73a00521-2632-4949-a7bc-799ca173d4b4


In [49]:
class DisasterCreator():
    """
    Data simulation for a natural disaster.
    Methods:
        - __init__: Credentials for Cloudant Connection.
        - conect_cloudant_static_data: Conection to Cloudant enviroment data-for-simulation database
        - create_disaster_database: Creation of Cloudant Database to load emergency data
        - simulate: Simulation of emergency data.
    """
    def __init__(self,credentials=credentials):
        """
        Credentials for Cloudant Connection.
        :param credentials: dict. Cloudant Conection Credentials. Must have host, username and password information.
        """
        self.__credentials__ = credentials
        self.spark = SparkSession.builder.getOrCreate()
    def conect_cloudant_static_data(self,name="data_model"):
        """
        Conection to Cloudant enviroment data-for-simulation database
        :param name: str. Name of data-for-simulation database
        :return: 
        """
        self.static_data = spark.read.format("org.apache.bahir.cloudant")\
        .option("cloudant.host",self.__credentials__["host"])\
        .option("cloudant.username", self.__credentials__["username"])\
        .option("cloudant.password",self.__credentials__["password"])\
        .load(name)
    def create_disaster_database(self,name='madrid'):
        """
        Creation of Cloudant Database to load emergency data
        :param name: str. Name to be given to the emergency database
        :return: 
        """
        today = datetime.datetime.today()
        db = name+str(today.year)+str(today.month).zfill(2)+str(today.day).zfill(2)+str(today.hour)+str(today.minute)
        client = Cloudant(self.__credentials__['username'],self.__credentials__['password'],url=self.__credentials__['url'])
        client.connect()
        session = client.session()
        self.__username__ = session['userCtx']['name']
        self.__databases__ = client.all_dbs()
        my_database = client.create_database(db)
        if my_database.exists():
            print("'{0}' successfully created.\n".format(db))
        self.__my_database__ = client[db]
    def simulate(self,numer_of_reports = 1000,stress = 10,a_population=7,b_population=3):
        """
        Simulation of emergency data.
        :param numer_of_reports: int. Number of reports to be simulated
        :param stress: int. Parameter to tune the emergency strss
        :param a_population: int. Tuning population location distribution.
        :param b_population: int. Tuning population location distribution.
        :return: 
        """
        # Simulation of N events between 0 and t_sleep seconds flat distributed.        
        N = numer_of_reports
        t_sleep = 2
        dni_num = [0,1,2,3,4,5,6,7,8,9]
        dni_let = ['A','B','C','D','E','F','G','H']
        lat_max = -3.664357
        lat_min = -3.733526
        lon_max = 40.466946
        lon_min = 40.403484
        stress = stress
        A = a_population
        B = b_population
        logf_id_event_aux = 0
        logp_id_event_aux = 0
        for i in range(N):
            timestamp = datetime.datetime.today().strftime("%m/%d/%Y%H:%M:%S")
            print(i)
            # We'll use ran_# to tune the stress of the disaster
            ran_1 = random.randint(0,10) # Tuning professions
            ran_2 = random.randint(0,10) # Tuning status in disaster 
            ran_3 = random.randint(0,10) # Tuning health status
            ran_4 = random.randint(0,10)
            ran_5 = random.randint(0,10)
            ran_6 = random.randint(0,10)
            # Simulation sleeping time
            rant = random.randint(0,t_sleep)
            # We'll use rand_ to take the values of the simulated variables
            rand_name = random.randint(0,99) # Name
            rand_sn = random.randint(0,99) # Surname
            rand_age = normal(40,20) # Age
            rand_prof = random.randint(0,8) # Profession
            if ran_1 > (10 - stress):
                rand_prof = 8
            rand_marital = random.randint(0,2) # Marital status
            rand_status = random.randint(0,3) # Status in emergency
            if ran_2 > (10 - stress):
                rand_status = 3
            rand_health = random.randint(0,3) # General health issues
            if ran_3 > (10 - stress):
                rand_health = 3
            if ran_4 > A:
                last_location = "camp_A" # Location - Camp A of affected people
            elif ran_4 > B:
                last_location = "camp_B" # Location - Camp B of affected people
            else:
                last_location = (normal((lat_max-lat_min)/2,1),normal((lon_max-lon_min)/2,1)) # Chatbot inputs
            rand_yesno  = random.randint(0,1)
            rand_lan = random.randint(0,4) # languages
            if ran_5 > (10 - stress):
                rand_lan = 0
            adress = None # Not using this version
            rand_medneed = random.randint(0,3) # Medical needs
            # We tune the information of logged data
                        # Tune number of males / females
            if rant % 2 == 0:
                names = 'malenames'
                gender = 'male'
            else:
                names = 'femalenames'
                gender = 'female'
                
            if ran_6 > (10 - stress): 
                logf_id_event_aux = logf_id_event_aux + 1
                logf_id_event = logf_id_event_aux
                logf_timestamp = (datetime.datetime.today() + datetime.timedelta(seconds = random.randint(0,120))).strftime("%m/%d/%Y%H:%M:%S")
                logf_place = random.randint(0,1160)
                logf_impact = random.randint(0,3)
                logf_name = dc.static_data.filter("_id = 'amenities'").select('name.'+str(logf_place)).collect()[0][0]
                logf_lat = dc.static_data.filter("_id = 'amenities'").select('lat.'+str(logf_place)).collect()[0][0]
                logf_lon = dc.static_data.filter("_id = 'amenities'").select('lon.'+str(logf_place)).collect()[0][0]
                logf_imp = dc.static_data.filter("_id = 'impact'").select('description.'+str(logf_impact)).collect()[0][0]
                
                logp_id_event_aux = logp_id_event_aux + 1
                logp_id_event = logp_id_event_aux# TODO: Set numeration internal to every personal record
                logp_timestamp = (datetime.datetime.today() + datetime.timedelta(seconds = random.randint(0,120))).strftime("%m/%d/%Y%H:%M:%S")
                logp_place = random.randint(0,1160)
                logp_need = random.randint(0,4)
                logp_name = random.randint(0,100)
                logp_place1 = dc.static_data.filter("_id = 'amenities'").select('name.'+str(logp_place)).collect()[0][0]
                logp_place2 = dc.static_data.filter("_id = 'amenities'").select('lat.'+str(logp_place)).collect()[0][0]
                logp_place3 = dc.static_data.filter("_id = 'amenities'").select('lon.'+str(logp_place)).collect()[0][0]
                logp_need = dc.static_data.filter("_id = 'help needs'").select('description.'+str(logp_need)).collect()[0][0]
                logp_name = dc.static_data.filter("_id = '"+names+"'").select('name.'+str(logp_name)).collect()[0][0]
                
                if random.randint(0,10) > 7:
                    logp_contact = "6"+"".join([str(x) for x in random.choices(dni_num,k=7)])
                else:
                    logp_contact = None
                logp_place
                
            else:
                logf_id_event = None
                logf_timestamp = None
                logf_place = None
                logf_impact = None
                logp_id_event = None
                logp_timestamp = None
                logp_place = None
                logp_need = None
                logf_name = None
                logf_lat = None
                logf_lon = None
                logf_imp = None
                if random.randint(0,10) > 7:
                    logp_contact = None
                else:
                    logp_contact = None
                logp_name = None
                
            # Not using family information this version
            fam_id_person = None
            fam_timestamp = None
            fam_name = None
            fam_last_name = None
            fam_age = None
            fam_adress = None
            
            time.sleep(rant)
            crisis_element = {
            "id_ord" : i,
            "timestamp" : timestamp,
            "personal_information":{
                "id":  "".join([str(x) for x in random.choices(dni_num,k=8)])+random.choice(dni_let),
                "name": dc.static_data.filter("_id = '"+names+"'").select('name.'+str(rand_name)).collect()[0][0],
                "last_name": dc.static_data.filter("_id = 'surnames'").select('surname.'+str(rand_sn)).collect()[0][0],
                "gender": gender,
                "age": int(rand_age),
                "profession": dc.static_data.filter("_id = 'profession'").select('description.'+str(rand_prof)).collect()[0][0],
                "marital": dc.static_data.filter("_id = 'marital'").select('description.'+str(rand_marital)).collect()[0][0],
                "status_emergency": dc.static_data.filter("_id = 'statusinemergency'").select('description.'+str(rand_status)).collect()[0][0],
                "health": dc.static_data.filter("_id = 'healthinformation'").select('description.'+str(rand_health)).collect()[0][0],
                "last_location": last_location,
                "help_attitude": dc.static_data.filter("_id = 'yesno'").select('description.'+str(rand_yesno)).collect()[0][0],
                "languages": dc.static_data.filter("_id = 'language'").select('description.'+str(rand_lan)).collect()[0][0],
                "adress": adress,
                "medical_needs": dc.static_data.filter("_id = 'medical needs'").select('description.'+str(rand_medneed)).collect()[0][0]    
            },
            "log_facilities":{
                "event":{
                    "id_event":logf_id_event,
                    "timestamp":logf_timestamp,
                    "name":logf_name ,
                    "lat": logf_lat ,
                    "lon": logf_lon ,
                    "impact": logf_imp 
                }
                # More events if needed       
                
            },
            "log_people":{
                "event":{
                    "id_event":logp_id_event,
                    "timestamp":logp_timestamp,
                    "place": logp_place1 ,
                    "lat": logp_place2 ,
                    "lon": logp_place3 ,
                    "need":logp_need ,
                    "contact":logp_contact ,
                    "name":logp_name 
                }
                # More events if needed       
                        
            },
            "family":{
                "person":{
                    "id_person":fam_id_person,
                    "timestamp":fam_timestamp,
                    "name":fam_name,
                    "last_name":fam_last_name,
                    "age":fam_age,
                    "adress":fam_adress
                }
            }    
            }
            my_document = self.__my_database__.create_document(crisis_element)


In [None]:
# Class execution. Default parameters are chosen as they are in the PoC simulation.
dc = DisasterCreator()
dc.conect_cloudant_static_data()
dc.create_disaster_database()
dc.simulate()

'madrid201907252141' successfully created.

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
