In [9]:
### data_source.py

### This script has the goal to simulate a DataSource of the project.
### It generates some mock data and publishes them to the Ingestor's component (so to the Kafka topic).
### This script run outside the cluster because I think also the real Data Sources will be.
### It is a Python Kafka producer (docs: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)


### If you don't have kafka-python already installed, you have to run
### >>> pip install kafka-python

############## "USER GENERATOR" #######################


import sys, getopt
import pandas as pd
import numpy as np
from kafka import KafkaProducer
from json import dumps
from time import sleep
from faker import Faker
from faker.providers import date_time
from datetime import date 

# def main(argv):
#    print(len(argv))
#    if len(argv) != 2:
#     print ('Usage: data_source.py -s <kafka_server> -t <topic_name>')
#     #return;

#    try:
#       opts, args = getopt.getopt(argv,"h",["help"])
#    except getopt.GetoptError:
#       print ('Usage: data_source.py -s <kafka_server> -t <topic_name>')
#       sys.exit(2)
#    print(opts)
#    print(args)

#    kafka_server = ''
#    topic_name = ''


# if __name__ == "__main__":
   #Check the arguments passed to the CL (excluding the script name)
   # if(len(sys.argv) - 1 != 2): 
   #      print ('Error: "data_source.py" requires 2 arguments\n')
   #      print ('Usage: data_source.py <kafka_server> <topic_name>')
   #      sys.exit(1)
   # else:
   #      kafka_server = sys.argv[1]
   #      topic_name = sys.argv[2]

   #Create a producer and a connection to the Kafka Broker
   # producer = KafkaProducer(bootstrap_servers=[kafka_server], 
   #                          value_serializer=lambda x: dumps(x).encode('utf-8'))

   # if(producer.bootstrap_connected()):
   #      print("Initial connection established")
   #      for i in range(3):
   #          data = { 'number' : i }
   #          producer.send(topic_name, value=data)
   #          print(data)
   #          sleep(2)
   # else:
   #      print("Something wrong in the initial connection to Kafka Server")
   #      sys.exit(2)
   
    
Faker.seed(0) # for reproducibility sake: WARNING: Calling the same methods with the same version of faker and seed produces the same results!
np.random.seed(0)
fake=Faker()
fake.add_provider(date_time) 

sequenceNumber   = 1
existing_user_id = []
balance          = {}
    
############## "USER GENERATOR" #######################

def random_id_utenti(size, newId, minBalance=0, p=None):
    """
    Generate size-length ndarray of user_id.
    The user id as the format PartnerId + sequenceNumber, where PartnerId is in [HE, CO, CA], and sequenceNumber is a unique integer.
    For example: HE_123 identifies the 123th user of Hera.
    """
    if not p:
        # default probabilities
        p = (0.5, 0.30, 0.19, 0.01) 
    
    partner_id = ("HE", "CO", "CA", "")
    user_id    = []
    global sequenceNumber  #'global' keyword is necessary otherwise the function define another sequenceNumber variable with a scope local to the function
    
    for el in np.random.choice(partner_id, size = size, p = p):
        if(newId):
            #generate a new user_id
            if(el != ""):
                id = el + "_" + str(sequenceNumber)         
                sequenceNumber = sequenceNumber + 1
                balance[id] = 0
            user_id.append(id)
            existing_user_id.append(id)
        elif(minBalance > 0):
            #retrieve an existing user_id with at least "minBalance" Token available
            allowed_ids = [ key for (key,value) in balance.items() if value >= minBalance ]
            if(len(allowed_ids) >= 1):
                user_id.append(np.random.choice(allowed_ids))
            else:
                user_id.append("") #Any user has the right amount
        else:
            #retrieve a random existing user_id
            user_id.append(np.random.choice(existing_user_id)) 
    return user_id

def random_birthdates(size):
    """Generate random dates within range between start and end."""
    birthdates = []
    for _ in range(size):
        birthdates.append(fake.date_of_birth(minimum_age=18, maximum_age=90))
    return birthdates

def calculateAge(birthDates):
    ages = []
    for d in birthDates:
        today = date.today() 
        age = today.year - d.year - ((today.month, today.day) < (d.month, d.day))
        ages.append(age)      
    return ages 

def random_genders(size, p = None):
    """Generate size-length ndarray of genders."""
    if not p:
        # default probabilities
        p = (0.49, 0.50, 0.01)
    gender = ("M", "F", "")
    return np.random.choice(gender, size = size, p = p)

def random_provincie(size, p = None):
    """Generate size-length ndarray of cities."""
    if not p:
        # default probabilities
        p = (0.25, 0.25, 0.25, 0.24, 0.01)
    province = ("BO", "MO", "RE", "PAR", "")
    return np.random.choice(province, size = size, p = p) 


def generate_user_record(size): 
    users_df = pd.DataFrame(columns = ['id_utente', 'Sesso', 'Data di Nascita', 'Eta', 'Provincia'])
    users_df['id_utente']           = random_id_utenti(size=size, newId=True)
    users_df['Sesso']               = random_genders(size) 
    users_df['Data di Nascita']     = random_birthdates(size)
    users_df['Eta']                 = calculateAge(users_df['Data di Nascita'])
    users_df['Provincia']           = random_provincie(size)
    return users_df


   ############## "COMPORTAMENTI GENERATOR" ##############

def random_comportamenti(size):
    comportamenti    = []
    partner_erogante = []
    reward           = []
    id_utenti        = []
    for _ in range(size):
        c = fake.random_element(elements = [('HERA', 'Autolettura consumo gas', 1.6), \
                                           ('HERA', 'Invio elettronico della bolletta', 1.75), \
                                           ('HERA', 'Acquisto energia elettrica da fonti rinnovabili', 2.85), \
                                           ('CONAD', 'Acquisto di prodotti sostenibili', 2.55), \
                                           ('CONAD', 'Recupero bottiglie di plastica', 0.9), \
                                           ('CAMST', 'Acquisto piatti e menu sostenibili', 2.4)] )
        partner_erogante.append(c[0])
        comportamenti.append(c[1])
        reward.append(c[2])
        id = random_id_utenti(size = 1, newId = False)
        balance[id[0]] += c[2]
        id_utenti.append(id[0])
    return comportamenti, partner_erogante, reward, id_utenti

def generate_comportamenti_record(size): 
    comportamenti_df = pd.DataFrame(columns=['comportamento', 'id_utente', 'Partner_erogante', 'reward(tk)'])
    comportamenti_df['comportamento'], comportamenti_df['Partner_erogante'], comportamenti_df['reward(tk)'], comportamenti_df["id_utente"] = random_comportamenti(size)
    return comportamenti_df
   
    
   ############## "PREMI GENERATOR" ######################

def random_premi(size):
    premi            = []
    partner_erogante = []
    prezzi           = []
    id_utenti        = []
    for _ in range(size):
        p = fake.random_element(elements = [('HERA', 'Sconto in bolletta 10€ Hera', 10), \
                                           ('CONAD', 'Buono Spesa 5€ Conad', 5), \
                                           ('CONAD', 'Buono Spesa 5€ Conad', 10),
                                           ('CAMST', 'Buono Spesa 5€ Camst', 5)] )
        partner_erogante.append(p[0])
        premi.append(p[1])
        prezzi.append(p[2])
        id = random_id_utenti(size = 1, newId = False, minBalance = p[2])
        if(id[0] != ''):
            balance[id[0]] -= p[2]  #subtract the price from the balance only if the reward has been registered correctly.
        id_utenti.append(id[0])
    return premi, partner_erogante, prezzi, id_utenti

def generate_premi_record(size):
    premi_df = pd.DataFrame(columns=['premio', 'id_utente', 'Partner_erogante', 'prezzo(tk)'])
    premi_df['premio'], premi_df['Partner_erogante'], premi_df['prezzo(tk)'], premi_df["id_utente"] = random_premi(size)
    return premi_df

In [11]:
generate_user_record(1000)

Unnamed: 0,id_utente,Sesso,Data di Nascita,Eta,Provincia
0,HE_6,M,1996-04-27,24,MO
1,HE_7,M,1985-04-19,35,MO
2,CA_8,M,1971-06-20,49,RE
3,CO_9,F,1995-01-19,26,MO
4,CA_10,F,1978-12-01,42,PAR
...,...,...,...,...,...
995,CO_992,M,2000-02-18,21,BO
996,HE_993,M,1950-05-09,70,RE
997,HE_994,M,1941-01-10,80,MO
998,HE_995,M,1975-02-08,46,MO


In [12]:
comportamenti = generate_comportamenti_record(100000)
comportamenti

Unnamed: 0,comportamento,id_utente,Partner_erogante,reward(tk)
0,Invio elettronico della bolletta,CO_459,HERA,1.75
1,Invio elettronico della bolletta,HE_38,HERA,1.75
2,Autolettura consumo gas,HE_914,HERA,1.60
3,Invio elettronico della bolletta,CO_822,HERA,1.75
4,Acquisto piatti e menu sostenibili,HE_432,CAMST,2.40
...,...,...,...,...
99995,Acquisto piatti e menu sostenibili,CO_753,CAMST,2.40
99996,Recupero bottiglie di plastica,HE_782,CONAD,0.90
99997,Autolettura consumo gas,HE_227,HERA,1.60
99998,Autolettura consumo gas,CO_931,HERA,1.60


In [13]:
balance

{'CO_1': 228.9000000000001,
 'CO_2': 164.99999999999997,
 'CO_3': 171.20000000000002,
 'CO_4': 237.5,
 'HE_5': 205.60000000000002,
 'HE_6': 197.65000000000003,
 'HE_7': 180.85000000000014,
 'CA_8': 158.4,
 'CO_9': 187.4,
 'CA_10': 202.84999999999997,
 'CA_11': 212.90000000000012,
 'CO_12': 195.2,
 'HE_13': 208.79999999999998,
 'CO_14': 181.75000000000003,
 'HE_15': 183.50000000000006,
 'CO_16': 199.65000000000003,
 'HE_17': 211.00000000000006,
 'CA_18': 171.75000000000003,
 'CO_19': 168.20000000000005,
 'HE_20': 208.9000000000001,
 'HE_21': 219.35000000000002,
 'CO_22': 199.30000000000013,
 'HE_23': 180.25,
 'CO_24': 134.9,
 'HE_25': 180.39999999999998,
 'CO_26': 227.55000000000004,
 'CO_27': 187.90000000000003,
 'CO_28': 199.49999999999994,
 'CA_29': 199.00000000000003,
 'CO_30': 186.35000000000002,
 'HE_31': 224.35000000000008,
 'HE_32': 196.60000000000005,
 'CO_33': 192.15000000000006,
 'HE_34': 182.15000000000003,
 'CO_35': 204.8000000000001,
 'CO_36': 216.85,
 'HE_37': 191.0500000

In [15]:
comportamenti.loc[:, ['id_utente','reward(tk)']].groupby("id_utente").sum()

Unnamed: 0_level_0,reward(tk)
id_utente,Unnamed: 1_level_1
CA_10,202.85
CA_100,167.25
CA_102,192.85
CA_105,206.05
CA_107,191.65
...,...
HE_990,197.95
HE_993,214.65
HE_994,214.55
HE_995,213.90


In [16]:
premi = generate_premi_record(50000)
premi

Unnamed: 0,premio,id_utente,Partner_erogante,prezzo(tk)
0,Buono Spesa 5€ Conad,HE_123,CONAD,5
1,Sconto in bolletta 10€ Hera,HE_68,HERA,10
2,Buono Spesa 5€ Conad,CA_872,CONAD,10
3,Sconto in bolletta 10€ Hera,HE_67,HERA,10
4,Buono Spesa 5€ Conad,HE_206,CONAD,5
...,...,...,...,...
49995,Buono Spesa 5€ Conad,,CONAD,5
49996,Buono Spesa 5€ Conad,,CONAD,5
49997,Buono Spesa 5€ Conad,,CONAD,10
49998,Buono Spesa 5€ Camst,,CAMST,5


In [17]:
balance

{'CO_1': 3.900000000000091,
 'CO_2': 4.999999999999972,
 'CO_3': 1.200000000000017,
 'CO_4': 2.5,
 'HE_5': 0.6000000000000227,
 'HE_6': 2.650000000000034,
 'HE_7': 0.8500000000001364,
 'CA_8': 3.4000000000000057,
 'CO_9': 2.4000000000000057,
 'CA_10': 2.849999999999966,
 'CA_11': 2.9000000000001194,
 'CO_12': 0.19999999999998863,
 'HE_13': 3.799999999999983,
 'CO_14': 1.7500000000000284,
 'HE_15': 3.500000000000057,
 'CO_16': 4.650000000000034,
 'HE_17': 1.0000000000000568,
 'CA_18': 1.7500000000000284,
 'CO_19': 3.2000000000000455,
 'HE_20': 3.900000000000091,
 'HE_21': 4.350000000000023,
 'CO_22': 4.300000000000125,
 'HE_23': 0.25,
 'CO_24': 4.900000000000006,
 'HE_25': 0.39999999999997726,
 'CO_26': 2.55000000000004,
 'CO_27': 2.900000000000034,
 'CO_28': 4.499999999999943,
 'CA_29': 4.000000000000028,
 'CO_30': 1.3500000000000227,
 'HE_31': 4.35000000000008,
 'HE_32': 1.6000000000000512,
 'CO_33': 2.1500000000000625,
 'HE_34': 2.150000000000034,
 'CO_35': 4.800000000000097,
 'CO_36

In [18]:
comportamenti2 = generate_comportamenti_record(100)
comportamenti2

Unnamed: 0,comportamento,id_utente,Partner_erogante,reward(tk)
0,Acquisto di prodotti sostenibili,CA_286,CONAD,2.55
1,Acquisto energia elettrica da fonti rinnovabili,CA_409,HERA,2.85
2,Acquisto energia elettrica da fonti rinnovabili,CO_218,HERA,2.85
3,Acquisto energia elettrica da fonti rinnovabili,CO_412,HERA,2.85
4,Acquisto di prodotti sostenibili,CO_335,CONAD,2.55
...,...,...,...,...
95,Acquisto di prodotti sostenibili,HE_511,CONAD,2.55
96,Acquisto energia elettrica da fonti rinnovabili,HE_726,HERA,2.85
97,Acquisto piatti e menu sostenibili,HE_282,CAMST,2.40
98,Acquisto piatti e menu sostenibili,HE_657,CAMST,2.40


In [19]:
premi2 = generate_premi_record(10)
premi2

Unnamed: 0,premio,id_utente,Partner_erogante,prezzo(tk)
0,Buono Spesa 5€ Conad,CA_844,CONAD,5
1,Buono Spesa 5€ Conad,,CONAD,10
2,Sconto in bolletta 10€ Hera,,HERA,10
3,Buono Spesa 5€ Conad,CO_982,CONAD,5
4,Buono Spesa 5€ Conad,,CONAD,10
5,Buono Spesa 5€ Conad,CA_409,CONAD,5
6,Sconto in bolletta 10€ Hera,,HERA,10
7,Sconto in bolletta 10€ Hera,,HERA,10
8,Sconto in bolletta 10€ Hera,,HERA,10
9,Buono Spesa 5€ Conad,,CONAD,10
