In [8]:

from typing import List, Dict
import csv
from pathlib import Path
from zipfile import ZipFile, ZIP_BZIP2
import random

from generators import TYPES_TO_GENERATORS, long_text, username
from schemas import (
    CUSTOMERS_SCHEMA,
    PEOPLE_SCHEMA,
    ORGANIZATIONS_SCHEMA,
    PRODUCTS_SCHEMA,
    OFFERS_SCHEMA,
    LEADS_SCHEMA
)

In [9]:
# Schema Dictionary

SCHEMA_TO_DICT = {
    'customers': CUSTOMERS_SCHEMA,
    'leads': LEADS_SCHEMA,
    'people': PEOPLE_SCHEMA,
    'organizations': ORGANIZATIONS_SCHEMA,
    'products': PRODUCTS_SCHEMA,
    'offers': OFFERS_SCHEMA
}

In [10]:
# Find Index in Schema

def find_index(list_values: List[Dict], name: str) -> int:
    possible = [i for i, elem in enumerate(list_values) if elem.get("name") == name]
    if len(possible) == 0:
        raise IndexError("No index found for {}".format(name))
    return possible[0] + 1

In [11]:
# Duplicate Variation

def add_small_variation_to_duplicates(row: List, schema: str) -> List:
    if schema != 'leads':
        return row

    last_name_index = find_index(LEADS_SCHEMA, "Last Name")
    website_index = find_index(LEADS_SCHEMA, "Website")
    email_1_index = find_index(LEADS_SCHEMA, "Email 1")
    email_2_index = find_index(LEADS_SCHEMA, "Email 2")
    phone_1_index = find_index(LEADS_SCHEMA, "Phone 1")
    phone_2_index = find_index(LEADS_SCHEMA, "Phone 2")
    note_index = find_index(LEADS_SCHEMA, "Notes")

    new_row = row.copy()
    lucky_number = random.randrange(1, 5)
    lucky_number_2 = random.randrange(1, 5)

    if lucky_number <= 2:
        last_name = new_row[last_name_index]
        last_name_changed = "{}.".format(last_name[:1])
        new_row[last_name_index] = last_name_changed

    website = new_row[website_index]
    if lucky_number == 3:
        if "https" in website:
            website = website.replace("https", "http")
        elif "http" in website:
            website = website.replace("http", "https")
        new_row[website_index] = website

    if lucky_number <= 2:
        email1 = new_row[email_1_index]
        email2 = new_row[email_2_index]
        new_row[email_1_index] = email2
        new_row[email_2_index] = email1

    if lucky_number_2 >= 3:
        email_to_change_index = random.choice([email_1_index, email_2_index])
        email_to_change = new_row[email_to_change_index]
        email_parts = email_to_change.split('@')
        new_row[email_to_change_index] = "".join([
            email_parts[0],
            "+",
            username(),
            "@",
            email_parts[1]
        ])

    if 2 <= lucky_number <= 4:
        phone1 = new_row[phone_1_index]
        phone2 = new_row[phone_2_index]
        new_row[phone_1_index] = phone2
        new_row[phone_2_index] = phone1

    if lucky_number >= 3:
        new_row[note_index] = long_text()

    return new_row

In [12]:
# Generate CSV File

def generate_file(schema='customers', name="customers", count=1000000, duplicate_ratio=0.0):
    if duplicate_ratio > 0:
        name = f"{name}-duplicates-{duplicate_ratio}"

    print(f"Generating file for: {schema} - {name} - {count} - {duplicate_ratio}")

    p = Path().resolve() / "files" / schema
    p.mkdir(parents=True, exist_ok=True)

    file_name = f"{name}.csv"
    file_path = p / file_name

    if not file_path.exists():
        schema_dict = SCHEMA_TO_DICT[schema]

        with open(file_path, 'w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            headers = [elem['name'] for elem in schema_dict]
            headers.insert(0, "Index")
            writer.writerow(headers)

            data_generators = [TYPES_TO_GENERATORS[elem['type']] for elem in schema_dict]
            unique_fields = [elem.get('unique', False) for elem in schema_dict]
            rows = []
            generated_rows = []
            generated_unique_values = {i: set() for i, v in enumerate(unique_fields) if v}

            for index in range(1, count+1):
                random_pick = random.random()
                add_randomness = random_pick < duplicate_ratio
                if duplicate_ratio > 0 and generated_rows and add_randomness:
                    row = random.choice(generated_rows)
                    row = [index] + row[1:]
                    row = add_small_variation_to_duplicates(row, schema)
                else:
                    row = [index]
                    for i, gen in enumerate(data_generators):
                        val = gen()
                        if unique_fields[i]:
                            while val in generated_unique_values[i]:
                                val = gen()
                            generated_unique_values[i].add(val)
                        row.append(val)
                    generated_rows.append(row)
                rows.append(row)

                if index % 1000 == 0:
                    writer.writerows(rows)
                    rows = []

                if index % 10000 == 0:
                    print(f"{index}/{count}")

            writer.writerows(rows)
    else:
        print(f"{file_path} already exists")

    file_name_zip = f"{name}.zip"
    file_path_zip = p / file_name_zip
    if not file_path_zip.exists():
        with ZipFile(file_path_zip, 'w', ZIP_BZIP2) as zipObj:
            zipObj.write(filename=file_path, arcname=file_name)

In [13]:
# Sample Call

generate_file('customers', 'customers-sample', 10)

Generating file for: customers - customers-sample - 10 - 0.0
C:\Users\zahid\Downloads\python\fiverr\survey\files\customers\customers-sample.csv already exists
