In [1]:
import pandas as pd 
import numpy as np
import json
import matplotlib.pyplot as plt
import re
import sys
import os
import time

pd.options.display.max_columns = 1000
pd.options.display.max_rows = 10

%matplotlib inline

base_path = "seeds/"

In [2]:
companies = pd.read_csv(base_path + "companylist.csv")
companies.head()

Unnamed: 0,Symbol,Name,LastSale,MarketCap,ADR TSO,IPOyear,Sector,Industry,Summary Quote,Unnamed: 9
0,YI,"111, Inc.",13.71,98369250.0,7175000.0,2018.0,Health Care,Medical/Nursing Services,https://www.nasdaq.com/symbol/yi,
1,PIH,"1347 Property Insurance Holdings, Inc.",6.25,37404790.0,,2014.0,Finance,Property-Casualty Insurers,https://www.nasdaq.com/symbol/pih,
2,PIHPP,"1347 Property Insurance Holdings, Inc.",25.3101,0.0,,,Finance,Property-Casualty Insurers,https://www.nasdaq.com/symbol/pihpp,
3,TURN,180 Degree Capital Corp.,2.25,70023510.0,,,Finance,Finance/Investors Services,https://www.nasdaq.com/symbol/turn,
4,FLWS,"1-800 FLOWERS.COM, Inc.",10.65,688032800.0,,1999.0,Consumer Services,Other Specialty Stores,https://www.nasdaq.com/symbol/flws,


In [3]:
names = pd.read_csv(base_path + "first_names.csv")
names.sample(10)

Unnamed: 0,Name,Gender,Count
77206,Rahmeek,M,35
71617,Neshia,F,81
60393,Loella,F,30
67847,Mikisha,F,101
9586,Audryana,F,99
51940,Kenosha,F,377
91530,Tannisha,F,66
87129,Shine,M,86
13698,Briceidy,F,25
97430,Vandan,M,44


In [4]:
addresses = pd.read_csv(base_path + "addresses.csv")
addresses.head()

Unnamed: 0,Name of Institution,Street Address,City,ZIP Code,County,Location 1
0,Abacus Federal Savings Bank,36-30 Main Street,Flushing,11354.0,Queens,"36 30 Main Street\nFlushing, NY 11354\n"
1,Adirondack Bank,448 Route 3,Plattsburgh,12901.0,Clinton,"448 Route\nPlattsburgh, NY 12901\n"
2,Adirondack Bank,13150 State Route 12,Boonville,13309.0,Oneida,"13150 State Route\nBoonville, NY 13309\n"
3,Adirondack Bank,Utica College,Utica,13501.0,Oneida,
4,"Adirondack Trust Company, The",112 Broadway,Saratoga Springs,12866.0,Saratoga,"112 Broadway\nSaratoga Springs, NY 12866\n"


In [5]:
domains = pd.read_csv(base_path + "top_domains.csv", header=None)[1]
domains.head(10)

0       google.com
1      youtube.com
2     facebook.com
3        baidu.com
4    wikipedia.org
5        yahoo.com
6           qq.com
7       taobao.com
8        tmall.com
9     google.co.in
Name: 1, dtype: object

In [6]:
class GenericData():
    
    _record_count_by_object = {}
    _code_by_object_name = {}
    _object_name_by_code = {}
    
    def __init__(self, object_name):
        count = GenericData._record_count_by_object.get(object_name, 0)
        object_code = GenericData._to_object_code(object_name)
        self.id =  "%s%010d" % (object_code, count)
        GenericData._record_count_by_object[object_name] = count + 1
            
    def __repr__(self):
        return self.id
    
    @staticmethod
    def save(records, filename):
        with open(filename, "w") as f:
            for r in records:
                f.write(str(r) + "\n")
    
    @staticmethod
    def reset():
        GenericData._record_count_by_object.clear()
        GenericData._code_by_object_name.clear()
        GenericData._object_name_by_code.clear()

    @staticmethod
    def sample(object_name, count):
        if not object_name in GenericData._code_by_object_name:
            raise ValueError("Invalid object name %s" % object_name) 
        
        object_code = GenericData._code_by_object_name[object_name]
        record_count = GenericData._record_count_by_object[object_name]
        indices = pd.Series(np.arange(record_count)).sample(count, replace = True)
        object_ids = ["%s%010d" % (object_code, i) for i in indices]
        return object_ids
    
    @staticmethod
    def rand_by_range(lower, upper, count): 
        return np.random.random(int(count)) * (upper - lower) + lower

    @staticmethod
    def generate_double(minimum, maximum, bins, count = None):
        bin_count = len(bins)
        
        if count is not None:
            bins = np.array(bins)
            bins = np.ceil((bins / np.sum(bins) * count))
        
        intervals = np.linspace(minimum, maximum, bin_count + 1)
        y = np.arange(0)
        for i in range(bin_count):
            lower, upper = intervals[i], intervals[i+1]
            count = bins[i]
            y = np.concatenate((y, GenericData.rand_by_range(lower, upper, count)))
        return list(np.array(y))
    
    @staticmethod
    def _to_object_code(s):
        import string
        
        if s in GenericData._code_by_object_name:
            return GenericData._code_by_object_name[s]
        
        alpha_digits =  string.digits + string.ascii_uppercase
        v = hash(s.strip().lower()) % (len(alpha_digits) ** 2)
        digit1 = v // 36
        digit0 = v % 36
        object_code = "".join([alpha_digits[digit1], alpha_digits[digit0]])
        
        GenericData._code_by_object_name[s] = object_code
        GenericData._object_name_by_code[object_code] = s
        return object_code

sample = GenericData("sample_object")
sample.id

'1L0000000000'

In [7]:
class Address(GenericData):
    
    _addresses = pd.read_csv(base_path + "addresses.csv")
    
    def __init__(self, street, city, zipcode, state, county = None):
        GenericData.__init__(self, "address")
        self.street = street
        self.city = city 
        self.zipcode = zipcode
        self.state = state
        self.county = county
    
    def __repr__(self):
        record = (self.id, self.street, self.city, self.zipcode, self.state, self.county)
        return json.dumps(self.__dict__)
   

    @staticmethod
    def generate_addresses(count):
        addresses = Address._addresses
        addresses = addresses.sample(count, replace = True)
        records = []
        for i in range(count):
            record = Address(
                list(addresses["Street Address"])[i],
                list(addresses["City"])[i],
                list(addresses["ZIP Code"])[i],
                "NY",
                list(addresses["County"])[i],
            )
            records.append(record)
        return records
            
addresses = Address.generate_addresses(3)  
addresses

[{"id": "I40000000000", "street": "315 Lafayette Street", "city": "New York", "zipcode": 10012.0, "state": "NY", "county": "New York"},
 {"id": "I40000000001", "street": "132 Parkway Road", "city": "Bronxville", "zipcode": 10708.0, "state": "NY", "county": "Westchester"},
 {"id": "I40000000002", "street": "120 North Street", "city": "Dryden", "zipcode": 13053.0, "state": "NY", "county": "Tompkins"}]

In [8]:
GenericData._record_count_by_object

{'sample_object': 1, 'address': 3}

In [9]:
class Person(GenericData):
    
    _first_names = pd.Series(pd.read_csv(base_path + "first_names.csv").Name.unique())
    _last_names = pd.read_csv(base_path + "last_names.csv", header = None)
    
    def __init__(self, first_name, last_name, age, dob, email, gender, address):
        GenericData.__init__(self, "person")
        self.first_name = first_name
        self.last_name = last_name
        self.age = age
        self.dob = dob
        self.email = email
        self.address = address
        self.gender = gender
    
    def __repr__(self):
        record = (self.id, self.first_name, self.last_name
                      , self.dob, self.age, self.email, self.gender, self.address)
        return json.dumps(self.__dict__)
        
    @staticmethod
    def generate_records(count):
        
        first_names = list(Person._first_names.sample(count, replace=True))
        last_names = list(Person._last_names.sample(count, replace=True)[0])
        last_names = [str(s).capitalize() for s in last_names]
        
        ages = np.random.randint(low = 18, high = 90, size = count) 
        days_since = ages * 365 + np.random.randint(low = 0, high = 365, size = count)

        today = pd.to_datetime("today")
        dobs = today - pd.to_timedelta(days_since, unit="D")
        dobs = [d.strftime("%Y-%m-%d") for d in dobs.date]
        
        email_domains = ["gmail", "msn", "hotmail", "yahoo"]
        
        genders = list(pd.Series(np.array(["M", "F"])).sample(count, replace = True))
        
        emails = []
        
        for fname, lname in zip(first_names, last_names):
            domain = email_domains[np.random.randint(0, len(email_domains) - 1, 1)[0]]
            fname = re.sub(r"[^a-z]", "", fname.lower())
            lname = re.sub(r"[^a-z]", "", lname.lower())
            if np.random.random() > 0.5:
                emails.append("%s.%s@%s.com" % (fname, lname, domain))
            else:
                emails.append("%s%s@%s.com" % (fname[0], lname, domain))
        
        addresses = Address.generate_addresses(count)
        
        records = []
        for i in range(count):
            record = Person(first_names[i], last_names[i]
                            , int(ages[i]), dobs[i], emails[i]
                            , genders[i], addresses[i].__dict__)
            records.append(record)
        return records

persons = Person.generate_records(3)
persons

[{"id": "BL0000000000", "first_name": "Danitza", "last_name": "Nolte", "age": 31, "dob": "1987-10-07", "email": "danitza.nolte@msn.com", "address": {"id": "I40000000003", "street": "329 First Avenue", "city": "New York", "zipcode": 10003.0, "state": "NY", "county": "New York"}, "gender": "F"},
 {"id": "BL0000000001", "first_name": "Strauss", "last_name": "Vieira", "age": 57, "dob": "1961-08-21", "email": "strauss.vieira@gmail.com", "address": {"id": "I40000000004", "street": "1995 New Highway", "city": "Farmingdale", "zipcode": 11735.0, "state": "NY", "county": "Nassau"}, "gender": "M"},
 {"id": "BL0000000002", "first_name": "Madyson", "last_name": "Gerling", "age": 18, "dob": "2000-07-13", "email": "madyson.gerling@gmail.com", "address": {"id": "I40000000005", "street": "11 Megan Drive", "city": "Geneseo", "zipcode": 14454.0, "state": "NY", "county": "Livingston"}, "gender": "F"}]

In [10]:
GenericData._record_count_by_object

{'sample_object': 1, 'address': 6, 'person': 3}

In [11]:
pd.DataFrame.from_records([d.__dict__ for d in persons])

Unnamed: 0,address,age,dob,email,first_name,gender,id,last_name
0,"{'id': 'I40000000003', 'street': '329 First Av...",31,1987-10-07,danitza.nolte@msn.com,Danitza,F,BL0000000000,Nolte
1,"{'id': 'I40000000004', 'street': '1995 New Hig...",57,1961-08-21,strauss.vieira@gmail.com,Strauss,M,BL0000000001,Vieira
2,"{'id': 'I40000000005', 'street': '11 Megan Dri...",18,2000-07-13,madyson.gerling@gmail.com,Madyson,F,BL0000000002,Gerling


In [12]:
class Merchant(GenericData):
    
    _companies = pd.read_csv(base_path + "companylist.csv")
    
    def __init__(self, name):
        GenericData.__init__(self, "merchant")
        self.name = name
    
    def __repr__(self):
        return json.dumps(self.__dict__)

    @staticmethod
    def generate_records(count):
        count = min(count, len(Merchant._companies))
        names = list(Merchant._companies.sample(count, replace = False).Name)
        records = []
        for i in range(count):
            record = Merchant(names[i])
            records.append(record)
        return records
    
Merchant.generate_records(10)    

[{"id": "I20000000000", "name": "SunPower Corporation"},
 {"id": "I20000000001", "name": "Energous Corporation"},
 {"id": "I20000000002", "name": "Texas Capital Bancshares, Inc."},
 {"id": "I20000000003", "name": "Community West Bancshares"},
 {"id": "I20000000004", "name": "Infinera Corporation"},
 {"id": "I20000000005", "name": "Minerva Neurosciences, Inc"},
 {"id": "I20000000006", "name": "Roku, Inc."},
 {"id": "I20000000007", "name": "Landcadia Holdings, Inc."},
 {"id": "I20000000008", "name": "ExlService Holdings, Inc."},
 {"id": "I20000000009", "name": "Eagle Financial Bancorp, Inc."}]

In [13]:
GenericData._record_count_by_object

{'sample_object': 1, 'address': 6, 'person': 3, 'merchant': 10}

In [14]:
class Transaction(GenericData):
    
    _types = ["pos", "atm", "mobile", "web"]
    
    def __init__(self, customer_id, merchant_id, amount, timestamp, category):
        super().__init__("transaction")
        self.customer_id = customer_id
        self.merchant_id = merchant_id
        self.amount = amount
        self.timestamp = timestamp
        self.category = category
    
    def __repr__(self):
        return json.dumps(self.__dict__)
    
    @staticmethod
    def generate_records(count):
        customer_ids = GenericData.sample("person", count)
        merchant_ids = GenericData.sample("merchant", count)
        amounts = GenericData.generate_double(0, 10000, [0.1, 0.3, 0.4, 0.4, 0.1, 0.1, 0.01], count)
        categories = list(pd.Series(Transaction._types).sample(count, replace = True))
        

        time_deltas = pd.to_timedelta(np.random.randint(10, 6 * 30 * 24 * 3600, count), "S")        
        timestamps = (pd.to_datetime("today") - time_deltas)
        timestamps = timestamps.astype(np.int32)
        timestamps = list(timestamps)
        
        records = []
        for i in range(count):
            record = Transaction(customer_ids[i], merchant_ids[i], amounts[i], timestamps[i], categories[i])
            records.append(record)
        return records
        
    
transactions = Transaction.generate_records(10)
transactions

[{"id": "SN0000000000", "customer_id": "BL0000000000", "merchant_id": "I20000000000", "amount": 501.2971550613497, "timestamp": 1528122398617621000, "category": "mobile"},
 {"id": "SN0000000001", "customer_id": "BL0000000002", "merchant_id": "I20000000000", "amount": 1893.7580707747409, "timestamp": 1528719623617621000, "category": "web"},
 {"id": "SN0000000002", "customer_id": "BL0000000002", "merchant_id": "I20000000007", "amount": 1645.6317896427686, "timestamp": 1531896087617621000, "category": "web"},
 {"id": "SN0000000003", "customer_id": "BL0000000000", "merchant_id": "I20000000005", "amount": 1535.3045886869918, "timestamp": 1532510826617621000, "category": "web"},
 {"id": "SN0000000004", "customer_id": "BL0000000002", "merchant_id": "I20000000007", "amount": 4015.4357431728567, "timestamp": 1525358152617621000, "category": "atm"},
 {"id": "SN0000000005", "customer_id": "BL0000000000", "merchant_id": "I20000000001", "amount": 3843.229113812249, "timestamp": 1534466646617621000,

In [15]:
GenericData._record_count_by_object

{'sample_object': 1,
 'address': 6,
 'person': 3,
 'merchant': 10,
 'transaction': 10}

In [16]:
pd.DataFrame.from_records([d.__dict__ for d in transactions])

Unnamed: 0,amount,category,customer_id,id,merchant_id,timestamp
0,501.297155,mobile,BL0000000000,SN0000000000,I20000000000,1528122398617621000
1,1893.758071,web,BL0000000002,SN0000000001,I20000000000,1528719623617621000
2,1645.63179,web,BL0000000002,SN0000000002,I20000000007,1531896087617621000
3,1535.304589,web,BL0000000000,SN0000000003,I20000000005,1532510826617621000
4,4015.435743,atm,BL0000000002,SN0000000004,I20000000007,1525358152617621000
5,3843.229114,pos,BL0000000000,SN0000000005,I20000000001,1534466646617621000
6,4023.149048,web,BL0000000000,SN0000000006,I20000000002,1534582794617621000
7,5311.422393,atm,BL0000000000,SN0000000007,I20000000000,1528215647617621000
8,5423.109767,mobile,BL0000000001,SN0000000008,I20000000002,1525018151617621000
9,5314.380199,mobile,BL0000000002,SN0000000009,I20000000001,1523587931617621000


In [17]:
%%time 

GenericData.reset()
persons = Person.generate_records(10000)
merchants = Merchant.generate_records(100)
transactions = Transaction.generate_records(10 ** 6)

os.makedirs("target", exist_ok=True)

GenericData.save(persons, "target/customers.json")
GenericData.save(merchants, "target/merchants.json")
GenericData.save(transactions, "target/transactions.json")

print("Record counts: ", GenericData._record_count_by_object)

print("Size in memory", sys.getsizeof(persons), sys.getsizeof(merchants), sys.getsizeof(transactions))

Record counts:  {'address': 10000, 'person': 10000, 'merchant': 100, 'transaction': 1000000}
Size in memory 87624 912 8697464
CPU times: user 12.5 s, sys: 1.26 s, total: 13.8 s
Wall time: 13.9 s


In [18]:
last_names = list(Person._last_names.sample(100, replace=True)[0])
last_names = [s.capitalize() for s in last_names]
last_names

['Siroka',
 'Greenan',
 'Koberle',
 'Vandermeulen',
 'Lomovic',
 'Crater',
 'Goudaliez',
 'Valtzis',
 'Zaki',
 'Bellion',
 'Vivenzio',
 'Keeling',
 'Tafarel',
 'Borchardt',
 'T johnston',
 'Rowert',
 'Van phelps',
 'Uten',
 'Nakada',
 'Lovell',
 'Witmer',
 'Al shuhail',
 'Fasano',
 'Kubik',
 'Nickolson',
 'Bernard',
 'Lovern',
 'Di giovanni',
 'Borisee',
 'Chiambretti',
 'Horandel',
 'Papar',
 'Zebrowski',
 'Mitra',
 'Che abas',
 'Rogaski',
 'Knisely',
 'Ver elst',
 'Chartier',
 'Greenland',
 'Zanutto',
 'Yoon',
 'Igelbusch',
 'Burnsworth',
 'Wirt',
 'Lassalle',
 'Cemke',
 'Keegans',
 'Imran',
 'Forbis',
 'Van  mourik',
 'Klement',
 'Garven',
 'Kuftic',
 'Orechowics',
 'Tampe',
 'Goza',
 'Tham',
 'Boucourt',
 'Puljan',
 'Mitre',
 'Mainz',
 'Thorington',
 'Hargrave',
 'Zuur',
 'Mafrici',
 'Trenchard',
 'Christiana',
 'Mngodo',
 'Husy',
 'Des grottes',
 'Rückgauer',
 'Spandri',
 'Hector',
 'Chateau',
 'Deckbar',
 'Bell',
 'Finardi',
 'Froeschner',
 'Laureano',
 'Fraunberger',
 'Bona',
 '

In [19]:
persons_df = pd.DataFrame.from_records([d.__dict__ for d in persons])
persons_df.head()

Unnamed: 0,address,age,dob,email,first_name,gender,id,last_name
0,"{'id': 'I40000000000', 'street': '20991 NYS Ro...",34,1984-06-03,rcoldron@gmail.com,Roanan,F,BL0000000000,Coldron
1,"{'id': 'I40000000001', 'street': ' 375 Fairpor...",61,1957-10-02,hparupalli@msn.com,Honesty,F,BL0000000001,Parupalli
2,"{'id': 'I40000000002', 'street': ' One Hunting...",72,1945-11-28,sojulari@msn.com,Stevana,M,BL0000000002,Ojulari
3,"{'id': 'I40000000003', 'street': '1833 Hylan B...",24,1993-11-12,ntriggs@msn.com,Naturi,M,BL0000000003,Triggs
4,"{'id': 'I40000000004', 'street': '729 Saw Mill...",72,1946-05-21,karliah.oestreicher@hotmail.com,Karliah,M,BL0000000004,Oestreicher


In [20]:
merchants_df = pd.DataFrame.from_records([d.__dict__ for d in merchants])
merchants_df.head()

Unnamed: 0,id,name
0,I20000000000,People&#39;s Utah Bancorp
1,I20000000001,Enstar Group Limited
2,I20000000002,"Second Sight Medical Products, Inc."
3,I20000000003,Innospec Inc.
4,I20000000004,"Kiniksa Pharmaceuticals, Ltd."


In [21]:
transactions_df = pd.DataFrame.from_records([d.__dict__ for d in transactions])
transactions_df.head()

Unnamed: 0,amount,category,customer_id,id,merchant_id,timestamp
0,1071.695509,atm,BL0000005982,SN0000000000,I20000000030,1537649859158400000
1,885.087978,mobile,BL0000001178,SN0000000001,I20000000088,1530736782158400000
2,511.936337,web,BL0000001844,SN0000000002,I20000000098,1531113330158400000
3,1043.015866,pos,BL0000001452,SN0000000003,I20000000084,1529638358158400000
4,1425.628178,pos,BL0000006516,SN0000000004,I20000000050,1531670149158400000


In [22]:
transactions_df.customer_id.value_counts()

BL0000007375    141
BL0000005437    139
BL0000003711    138
BL0000006858    137
BL0000002348    136
               ... 
BL0000006624     68
BL0000009314     67
BL0000009868     67
BL0000001013     64
BL0000009862     62
Name: customer_id, Length: 10000, dtype: int64

In [23]:
transactions_df.customer_id.unique().shape

(10000,)

# Send the transactions data to Kafka

In [24]:
rate_eps = 100 
wait_interval = 1.0 / rate_eps
for tnx in transactions[:10]:
    print(tnx)
    time.sleep(wait_interval)
    # call a method to send the tnx to kafka

{"id": "SN0000000000", "customer_id": "BL0000005982", "merchant_id": "I20000000030", "amount": 1071.6955093695692, "timestamp": 1537649859158400000, "category": "atm"}
{"id": "SN0000000001", "customer_id": "BL0000001178", "merchant_id": "I20000000088", "amount": 885.0879784808394, "timestamp": 1530736782158400000, "category": "mobile"}
{"id": "SN0000000002", "customer_id": "BL0000001844", "merchant_id": "I20000000098", "amount": 511.93633697401555, "timestamp": 1531113330158400000, "category": "web"}
{"id": "SN0000000003", "customer_id": "BL0000001452", "merchant_id": "I20000000084", "amount": 1043.0158658120042, "timestamp": 1529638358158400000, "category": "pos"}
{"id": "SN0000000004", "customer_id": "BL0000006516", "merchant_id": "I20000000050", "amount": 1425.628177657675, "timestamp": 1531670149158400000, "category": "pos"}
{"id": "SN0000000005", "customer_id": "BL0000005348", "merchant_id": "I20000000087", "amount": 102.03932744385581, "timestamp": 1531657930158400000, "category"

In [None]:
import io
import random
import time
from kafka import KafkaProducer
import json

# To send messages synchronously
producer = KafkaProducer(bootstrap_servers = "localhost:9092"
                         , linger_ms = 10
                         , compression_type = "gzip")

# Kafka topic
topic = "demo"

rate_eps = 1
wait_interval = 1.0 / rate_eps
for tnx in transactions[:10]:
    print(tnx)
    producer.send(topic, str(tnx).encode("utf-8"))
    time.sleep(wait_interval)
    #producer.flush()

{"id": "SN0000000000", "customer_id": "BL0000005982", "merchant_id": "I20000000030", "amount": 1071.6955093695692, "timestamp": 1537649859158400000, "category": "atm"}
{"id": "SN0000000001", "customer_id": "BL0000001178", "merchant_id": "I20000000088", "amount": 885.0879784808394, "timestamp": 1530736782158400000, "category": "mobile"}
{"id": "SN0000000002", "customer_id": "BL0000001844", "merchant_id": "I20000000098", "amount": 511.93633697401555, "timestamp": 1531113330158400000, "category": "web"}
{"id": "SN0000000003", "customer_id": "BL0000001452", "merchant_id": "I20000000084", "amount": 1043.0158658120042, "timestamp": 1529638358158400000, "category": "pos"}
{"id": "SN0000000004", "customer_id": "BL0000006516", "merchant_id": "I20000000050", "amount": 1425.628177657675, "timestamp": 1531670149158400000, "category": "pos"}
{"id": "SN0000000005", "customer_id": "BL0000005348", "merchant_id": "I20000000087", "amount": 102.03932744385581, "timestamp": 1531657930158400000, "category"