# Kafka : Produce data into databases

This notebook includes the steps to ingest records from the crimes dataset into MongoDB databases

### Imports

In [1]:
import pandas as pd
import pymongo
from pymongo import MongoClient
import time
import json
import numpy as np
import random

  from cryptography.x509 import load_der_x509_certificate as _load_der_x509_certificate


### Load the dataset

In [2]:
df = pd.read_csv('data/Chicago_Crimes_2022.csv')
pd.set_option('display.max_columns', None)
df.head(10)

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,12747935,JF300179,06/29/2022 02:30:00 PM,016XX W JACKSON BLVD,1150,DECEPTIVE PRACTICE,CREDIT CARD FRAUD,HOSPITAL BUILDING / GROUNDS,0,0,1231,12.0,27.0,28.0,11,1165476.0,1898692.0,2022,10/11/2022 04:45:59 PM,41.877606,-87.667878,"(41.87760556, -87.667877617)"
1,12744058,JF294331,06/26/2022 05:31:00 AM,023XX N LECLAIRE AVE,1477,WEAPONS VIOLATION,RECKLESS FIREARM DISCHARGE,STREET,0,0,2522,25.0,36.0,19.0,15,1142010.0,1914897.0,2022,10/11/2022 04:45:59 PM,41.922541,-87.753637,"(41.922540898, -87.753637344)"
2,12750921,JF305173,07/04/2022 04:13:00 PM,024XX S STATE ST,0486,BATTERY,DOMESTIC BATTERY SIMPLE,VEHICLE NON-COMMERCIAL,1,1,131,1.0,3.0,33.0,08B,1176697.0,1888303.0,2022,10/11/2022 04:45:59 PM,41.848851,-87.626991,"(41.848851341, -87.626991263)"
3,12746079,JF299265,06/29/2022 09:06:00 PM,128XX S PARNELL AVE,143A,WEAPONS VIOLATION,UNLAWFUL POSSESSION - HANDGUN,STREET,1,0,523,5.0,9.0,53.0,15,1174974.0,1819918.0,2022,10/11/2022 04:45:59 PM,41.661233,-87.635351,"(41.661233333, -87.635350952)"
4,12746849,JF300249,06/30/2022 10:16:00 AM,050XX W WOLFRAM ST,1130,DECEPTIVE PRACTICE,FRAUD OR CONFIDENCE GAME,RESIDENCE,0,0,2521,25.0,31.0,19.0,11,1142307.0,1918481.0,2022,10/11/2022 04:45:59 PM,41.93237,-87.752457,"(41.932370244, -87.752456846)"
5,12745543,JF297799,06/28/2022 04:00:00 PM,057XX S MARYLAND AVE,0560,ASSAULT,SIMPLE,HOSPITAL BUILDING / GROUNDS,0,1,235,2.0,5.0,41.0,08A,1182884.0,1867234.0,2022,10/11/2022 04:45:59 PM,41.790895,-87.60494,"(41.790894592, -87.604940233)"
6,12713141,JF259485,05/29/2022 12:00:00 AM,014XX S MUSEUM CAMPUS DR,0870,THEFT,POCKET-PICKING,SPORTS ARENA / STADIUM,0,0,132,1.0,4.0,33.0,06,,,2022,06/05/2022 04:48:59 PM,,,
7,12745635,JF297619,06/28/2022 08:00:00 AM,054XX W 64TH ST,0910,MOTOR VEHICLE THEFT,AUTOMOBILE,STREET,0,0,813,8.0,13.0,64.0,07,1141342.0,1861609.0,2022,10/11/2022 04:45:59 PM,41.776323,-87.757405,"(41.776323462, -87.757404962)"
8,12751464,JF305866,07/04/2022 10:00:00 PM,016XX N MAYFIELD AVE,0810,THEFT,OVER $500,APARTMENT,0,0,2531,25.0,29.0,25.0,06,1136840.0,1910376.0,2022,10/11/2022 04:45:59 PM,41.910229,-87.772742,"(41.91022908, -87.772742424)"
9,12745462,JF297804,06/28/2022 07:00:00 PM,005XX N WELLS ST,0820,THEFT,$500 AND UNDER,RESTAURANT,0,0,1831,18.0,42.0,8.0,06,1174639.0,1903961.0,2022,10/11/2022 04:45:59 PM,41.891864,-87.634076,"(41.891864126, -87.634076111)"


### Connect to MongoDB database

In [3]:
cluster = MongoClient('mongodb+srv://Abdulaziz:CENSORED@mongodemo.kqbhziq.mongodb.net/?retryWrites=true&w=majority')

### Custom class to convert data types that are not supported by MongoDB

In [4]:
class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        else:
            return super(CustomEncoder, self).default(obj)

### A function to preprocess rows and insert them into MongoDB database

In [5]:
def insert_doc(collection, row):
    row_dict = json.dumps(row.to_dict(), cls=CustomEncoder)
    row_dict  = json.loads(row_dict)
    collection.insert_one(row_dict)

### Insert one document

In [6]:
DB = cluster['Police_Jeddah']
Crimes = DB['Crimes']
insert_doc(Crimes, df.iloc[0])

In [7]:
DB = cluster['Police_Dammam']
Crimes = DB['Crimes']
insert_doc(Crimes, df.iloc[1])

### Randomly and consecutively Insert different documents in all three databases (Riyadh, Jeddah, Dammam)

In [None]:
# Names of police databases
DBs = ['Police_Riyadh', 'Police_Jeddah', 'Police_Dammam']

# Iterate over crime records in the dataset
for i in range(len(df)):
    
    # Get a random database from the three databases above
    random_db = random.randint(0, 2)
    DB = cluster[DBs[random_db]]
    Crimes = DB['Crimes']
    
    # Insert a record in the specified database
    insert_doc(Crimes, df.iloc[i])
    time.sleep(1)

### Delete everything

In [16]:
DB = cluster['Police_Jeddah']
Crimes = DB['Crimes']
Crimes.delete_many({})

DB = cluster['Police_Dammam']
Crimes = DB['Crimes']
Crimes.delete_many({})

DB = cluster['Police_Riyadh']
Crimes = DB['Crimes']
Crimes.delete_many({})

<pymongo.results.DeleteResult at 0x7f1b6951c780>

In [17]:
DB = cluster['NIC']
Crimes = DB['Police_Crimes']
Crimes.delete_many({})

DB = cluster['MOI']
Crimes = DB['Police_Crimes']
Crimes.delete_many({})

<pymongo.results.DeleteResult at 0x7f1b69532f98>