In [148]:
import sys
from os.path import exists
# Smart progress bar reader...
import tqdm
import pandas as pd
import json
from sodapy import Socrata
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk

In [149]:
DATASET_PATH = 'C:/Users/Dean Truong/Jupyter/Elasticsearch/contractors.json'

In [150]:
def download_dataset():
# Downloads the public dataset if not locally downloaded 
# and returns the number of rows are in the .json file.

    if not exists(DATASET_PATH):
        print("JSON file does not exist...")
        print("Downloading dataset from data.lacity.org...")
        
        client = Socrata("data.lacity.org","8YzE0NUTN2pl4gdOcY5fAquLi",username="dtruong8@toromail.csudh.edu",password="Helloworld123.")
        
        results = client.get("yv23-pmwf",select="distinct contractors_business_name", where="applicant_relationship = 'Contractor'", order="contractors_business_name")
        
        # convert to data frame
        df = pd.DataFrame.from_records(results)
        
        df.to_json(r'C:/Users/Dean Truong/Jupyter/Elasticsearch/contractors.json',orient='records')
        print("Dataset downloaded...")
        
        return df.shape[0]
    else:
        df = pd.read_json(DATASET_PATH)
        return df.shape[0]

In [151]:
def create_index(client):
    """Creates an index in Elasticsearch if one isn't already there."""
    if client.indices.exists(index="contractors"):
        print("Error: 'contractor' index exist...")
        print("Deleting...")
        client.indices.delete(index="contractors",ignore=[400, 404])
        print("Successfully deleted index...")
        
    client.indices.create(
        index="contractors",
        body={
            "settings": {"number_of_shards": 1},
            "mappings": {
                "properties": {
                    "names": {"type": "text"},
                }
            },
        },
        ignore=400,
    )
    print("Successfully created contractor index")
        
        

In [152]:
def generate_actions():
# Reads the file through csv.DictReader() and for each row
# yields a single document. This function is passed into the bulk()
# helper to create many documents in sequence.
    with open(DATASET_PATH, mode="r") as f:
        data = json.load(f)
        f.close()
        i = 0
        for names in data:
            doc = {
                "_id": i + 1,
                "name": names["contractors_business_name"]
            }
            i = i + 1
            yield doc

In [155]:
def main():
    client = Elasticsearch(['localhost:9200'])
    # print(es.info()) # verification
    
    number_of_docs = download_dataset()
    
    print("Creating index an index...")
    create_index(client)
    
    print("Indexing documents...")
    progress = tqdm.tqdm(unit="docs", total=number_of_docs)
    successes = 0
    for ok, action in streaming_bulk(
        client = client, index="contractors", actions=generate_actions(),
    ):
        progress.update(1)
        successes += ok
    print("Indexed %d/%d documents" % (successes, number_of_docs))
    

In [156]:
main()



  0%|          | 0/1000 [00:00<?, ?docs/s][A[A

Creating index an index...
Error: 'contractor' index exist...
Deleting...
Successfully deleted index...
Successfully created contractor index
Indexing documents...


100%|██████████| 1000/1000 [00:00<00:00, 11358.80docs/s]

Indexed 1000/1000 documents



