In [1]:
! pip install faker



In [8]:
import asyncio
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import random
import string
from faker import Faker
from datetime import datetime
import io
import nest_asyncio
from google.cloud import storage
from google.oauth2 import service_account
from concurrent.futures import ThreadPoolExecutor

# Helper function to generate random IDs
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(size))

# Generate project IDs
projects = [f"{id_generator(4)}" for _ in range(4)]

# Generate tenant IDs
Faker.seed(100)
fake = Faker()
tenants = [id_generator(4) for _ in range(10)]

events = ["unfreeze", "depressurize", "warm", "", "pressurize", "normalize"]

# GCP configuration
gcp_bucket_name = "myfirstproject-inputbucket"
gcp_credentials_path = "double-balm-438113-f5-53067ace2784.json"

nest_asyncio.apply()

# Define schema (same as before)
humidity_schema = pa.struct([
    ('temperature', pa.float64()),
    ('relativeHumidity', pa.float64()),
    ('updateTime', pa.timestamp('ms'))
])

event_data_schema = pa.struct([
    ('humidity', humidity_schema)
])

event_schema = pa.struct([
    ('eventId', pa.string()),
    ('targetName', pa.string()),
    ('eventType', pa.string()),
    ('data', event_data_schema),
    ('timestamp', pa.timestamp('ms'))
])

metadata_schema = pa.struct([
    ('deviceId', pa.string()),
    ('projectId', pa.string()),
    ('deviceType', pa.string()),
    ('productNumber', pa.string())
])

data_schema = pa.struct([
    ('event', event_schema),
    ('metadata', metadata_schema)
])

schema = pa.schema([
    ('day', pa.int64()),
    ('month', pa.int64()),
    ('year', pa.int64()),
    ('tenantId', pa.string()),
    ('eventType', pa.string()),
    ('eventId', pa.string()),
    ('data', data_schema)
])

def upload_to_gcs(bucket, blob_name, data):
    blob = bucket.blob(blob_name)
    blob.upload_from_string(data)
    print(f'Successfully uploaded file to GCS: {blob_name}')

async def generate_and_upload_data(i, bucket, executor):
    # Generate sample data (same as before)
    day = fake.day_of_month()
    month = fake.month()
    year = fake.year()
    tenantId = random.choice(tenants)
    project = random.choice(projects)
    eventType = random.choice(events)
    eventId = f"{tenantId}_{id_generator(4)}"
    update_time = fake.date_time_this_year()
    
    sample_json_data = [{
        "day": day,
        "month": month,
        "year": year,
        "tenantId": tenantId,
        "eventType": eventType,
        "eventId": eventId,
        "data": {
            "event": {
                "eventId": eventId,
                "targetName": f"projects/{project}/devices/{eventId}adg",
                "eventType": eventType,
                "data": {
                    "humidity": {
                        "temperature": round(random.uniform(10.0, 40.0), 2),
                        "relativeHumidity": round(random.uniform(30.0, 70.0), 2),
                        "updateTime": update_time
                    }
                },
                "timestamp": update_time
            },
            "metadata": {
                "deviceId": f"{eventId}adg",
                "projectId": project,
                "deviceType": eventType,
                "productNumber": "102081"
            }
        }
    }]
    
    df = pd.DataFrame(sample_json_data)
    df['day'] = df['day'].astype('int64')
    df['month'] = df['month'].astype('int64')
    df['year'] = df['year'].astype('int64')

    table = pa.Table.from_pandas(df, schema=schema)
    
    try:
        with io.BytesIO() as f:
            pq.write_table(table, f, coerce_timestamps='ms')
            f.seek(0)
            blob_name = f"{eventId}_.parquet"
            # Use ThreadPoolExecutor to run the upload in a separate thread
            await asyncio.get_event_loop().run_in_executor(
                executor, upload_to_gcs, bucket, blob_name, f.getvalue())
    except Exception as e:
        print(f'Error on iteration {i}: {e}')

async def main():
    n_rows = 3  # Number of rows
    
    # Set up GCP credentials and client
    credentials = service_account.Credentials.from_service_account_file(gcp_credentials_path)
    storage_client = storage.Client(credentials=credentials)
    bucket = storage_client.bucket(gcp_bucket_name)
    
    # Create a ThreadPoolExecutor for running GCS uploads
    with ThreadPoolExecutor(max_workers=10) as executor:
        tasks = [asyncio.ensure_future(generate_and_upload_data(i, bucket, executor)) for i in range(n_rows)]
        await asyncio.gather(*tasks)

# Run the main coroutine
if __name__ == "__main__":
    before = datetime.now()
    print("Start time:", before)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    after = datetime.now()
    print("End time:", after)
    print("Elapsed time:", after - before)

Start time: 2024-10-18 08:28:42.700483
Successfully uploaded file to GCS: yk53_av8o_.parquet
Successfully uploaded file to GCS: i1qp_3mdc_.parquet
Successfully uploaded file to GCS: 8j9f_c4hd_.parquet
End time: 2024-10-18 08:28:43.941967
Elapsed time: 0:00:01.241484


In [4]:
import asyncio
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import random
import string
from faker import Faker
from datetime import datetime
import io
import nest_asyncio
from google.cloud import storage
from google.oauth2 import service_account
from concurrent.futures import ThreadPoolExecutor

# Helper function to generate random IDs
def id_generator(size=11, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(size))

# Generate project IDs
projects = [f"{id_generator(4)}" for _ in range(4)]

# Generate tenant IDs
Faker.seed(100)
fake = Faker()
tenants = [id_generator(4) for _ in range(10)]

events = ["unfreeze", "depressurize", "warm", "", "pressurize", "normalize"]

# GCP configuration
gcp_bucket_name = "myfirstproject-inputbucket"
gcp_credentials_path = "double-balm-438113-f5-53067ace2784.json"  # Atualize com o caminho correto

nest_asyncio.apply()

# Define schema
schema = pa.schema([
    ('day', pa.int64()),
    ('month', pa.int64()),
    ('year', pa.int64()),
    ('tenantId', pa.string()),
    ('eventType', pa.string()),
    ('eventId', pa.string()),
    ('data', pa.struct([
        ('Fornecedor', pa.struct([
            ('Nome_Fazenda', pa.string()),
            ('Proprietario', pa.string()),
            ('CNPJ_CPF', pa.string()),
            ('Endereco', pa.string()),
            ('Telefone', pa.string()),
            ('Email', pa.string())
        ])),
        ('MateriaPrima', pa.struct([
            ('Tipo_MateriaPrima', pa.string()),
            ('Quantidade_Recebida', pa.float64()),
            ('Data_Recebimento', pa.timestamp('ms')),
            ('Qualidade', pa.string())  # Alterado para string
        ])),
        ('Produto', pa.struct([
            ('Nome_Produto', pa.string()),
            ('Categoria', pa.string()),
            ('Descricao', pa.string()),
            ('Unidade_Medida', pa.string()),
            ('Preco_Unitario', pa.float64()),
            ('Prazo_Validade', pa.int64())
        ])),
        ('Funcionario', pa.struct([
            ('Nome', pa.string()),
            ('Cargo', pa.string()),
            ('Departamento', pa.string()),
            ('Data_Contratacao', pa.timestamp('ms')),
            ('Salario', pa.float64()),
            ('Contato_Emergencia', pa.string())
        ]))
    ]))
])

equipamentos_laticinios = [
    "Tanques de recepção e armazenamento de leite cru",
    "Sistemas de filtração e clarificação",
    "Pasteurizadores (placas ou tubulares)",
    "Homogeneizadores",
    "Tanques de processamento UHT",
    "Fermentadores para iogurte",
    "Tanques de cultura para queijos",
    "Prensas para queijo",
    "Formas para queijo",
    "Batedeiras para manteiga",
    "Desnatadeiras",
    "Tanques de maturação para queijos",
    "Câmaras de maturação para queijos",
    "Equipamentos de corte e moldagem de queijos",
    "Embaladoras para diversos produtos",
    "Tanques de mistura para bebidas lácteas",
    "Sistemas CIP (Clean-in-Place)",
    "Caldeiras para geração de vapor",
    "Torres de resfriamento",
    "Compressores de ar",
    "Sistemas de refrigeração",
    "Bombas centrífugas sanitárias",
    "Trocadores de calor",
    "Evaporadores (para produção de leite condensado e creme de leite)",
    "Tanques de cristalização (para requeijão)",
    "Sistemas de automação e controle",
    "Equipamentos de laboratório para controle de qualidade",
    "Esteiras transportadoras",
    "Empilhadeiras",
    "Câmaras frias para armazenamento de produtos acabados"
]

def upload_to_gcs(bucket, blob_name, data):
    blob = bucket.blob(blob_name)
    blob.upload_from_string(data)
    print(f'Successfully uploaded file to GCS: {blob_name}')

async def generate_and_upload_data(i, bucket, executor):
    day = fake.day_of_month()
    month = fake.month()
    year = fake.year()
    tenantId = random.choice(tenants)
    project = random.choice(projects)
    eventType = random.choice(events)
    eventId = f"{tenantId}_{id_generator(4)}"
    update_time = fake.date_time_this_year()
    
    sample_json_data = [{
        "day": day,
        "month": month,
        "year": year,
        "tenantId": tenantId,
        "eventType": eventType,
        "eventId": eventId,
        "data": {
            "Fornecedor": {
                "Nome_Fazenda": fake.company(),
                "Proprietario": fake.name(),
                "CNPJ_CPF": id_generator(),
                "Endereco": fake.address(),
                "Telefone": fake.phone_number(),
                "Email": fake.email()
            },
            "MateriaPrima": {
                "Tipo_MateriaPrima": "Leite",
                "Quantidade_Recebida": round(random.uniform(1000.0, 5000.0), 2),
                "Data_Recebimento": update_time,
                "Qualidade": '{"acidez": 0.12, "bacterias": 250}'  # Exemplo de JSON
            },
            "Produto": {
                "Nome_Produto": "Leite Integral",
                "Categoria": "Laticínios",
                "Descricao": "Leite integral fresco.",
                "Unidade_Medida": "litro",
                "Preco_Unitario": round(random.uniform(2.0, 5.0), 2),
                "Prazo_Validade": random.randint(5, 30)
            },
            "Funcionario": {
                "Nome": fake.name(),
                "Cargo": "Operador",
                "Departamento": "Produção",
                "Data_Contratacao": update_time,
                "Salario": round(random.uniform(2000.0, 5000.0), 2),
                "Contato_Emergencia": fake.name() + ", " + fake.phone_number()
            },
            "equipamento": {
                "id_equipamento": id_generator(7),
                "nome_equipamento": random.choice(equipamentos_laticinios),
                "data_aquisicao": update_time
            }
        }
    }]
    
    df = pd.DataFrame(sample_json_data)
    df['day'] = df['day'].astype('int64')
    df['month'] = df['month'].astype('int64')
    df['year'] = df['year'].astype('int64')

    table = pa.Table.from_pandas(df, schema=schema)
    
    try:
        with io.BytesIO() as f:
            pq.write_table(table, f, coerce_timestamps='ms')
            f.seek(0)
            blob_name = f"{eventId}_.parquet"
            await asyncio.get_event_loop().run_in_executor(
                executor, upload_to_gcs, bucket, blob_name, f.getvalue())
    except Exception as e:
        print(f'Error on iteration {i}: {e}')

async def main():
    n_rows = 3  # Number of rows
    
    credentials = service_account.Credentials.from_service_account_file(gcp_credentials_path)
    storage_client = storage.Client(credentials=credentials)
    bucket = storage_client.bucket(gcp_bucket_name)
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        tasks = [asyncio.ensure_future(generate_and_upload_data(i, bucket, executor)) for i in range(n_rows)]
        await asyncio.gather(*tasks)

# Run the main coroutine
if __name__ == "__main__":
    before = datetime.now()
    print("Start time:", before)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    after = datetime.now()
    print("End time:", after)
    print("Elapsed time:", after - before)


Start time: 2024-10-18 21:34:57.512159
Successfully uploaded file to GCS: 0grd_nhvq_.parquet
Successfully uploaded file to GCS: 3hxl_j2wz_.parquet
Successfully uploaded file to GCS: m53r_mayt_.parquet
End time: 2024-10-18 21:34:58.744331
Elapsed time: 0:00:01.232172


In [3]:
import asyncio
import pandas as pd
import random
import string
from faker import Faker
from datetime import datetime
import nest_asyncio
from google.cloud import storage
from google.oauth2 import service_account
from concurrent.futures import ThreadPoolExecutor
import json

# Helper function to generate random IDs
def id_generator(size=11, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(size))

# Generate project IDs
projects = [f"{id_generator(4)}" for _ in range(4)]

# Generate tenant IDs
Faker.seed(100)
fake = Faker()
tenants = [id_generator(4) for _ in range(10)]

events = ["unfreeze", "depressurize", "warm", "", "pressurize", "normalize"]

# GCP configuration
gcp_bucket_name = "myfirstproject-inputbucket"
gcp_credentials_path = "double-balm-438113-f5-53067ace2784.json"  # Atualize com o caminho correto

nest_asyncio.apply()

equipamentos_laticinios = [
    "Tanques de recepção e armazenamento de leite cru",
    "Sistemas de filtração e clarificação",
    "Pasteurizadores (placas ou tubulares)",
    "Homogeneizadores",
    "Tanques de processamento UHT",
    "Fermentadores para iogurte",
    "Tanques de cultura para queijos",
    "Prensas para queijo",
    "Formas para queijo",
    "Batedeiras para manteiga",
    "Desnatadeiras",
    "Tanques de maturação para queijos",
    "Câmaras de maturação para queijos",
    "Equipamentos de corte e moldagem de queijos",
    "Embaladoras para diversos produtos",
    "Tanques de mistura para bebidas lácteas",
    "Sistemas CIP (Clean-in-Place)",
    "Caldeiras para geração de vapor",
    "Torres de resfriamento",
    "Compressores de ar",
    "Sistemas de refrigeração",
    "Bombas centrífugas sanitárias",
    "Trocadores de calor",
    "Evaporadores (para produção de leite condensado e creme de leite)",
    "Tanques de cristalização (para requeijão)",
    "Sistemas de automação e controle",
    "Equipamentos de laboratório para controle de qualidade",
    "Esteiras transportadoras",
    "Empilhadeiras",
    "Câmaras frias para armazenamento de produtos acabados"
]

def upload_to_gcs(bucket, blob_name, data):
    blob = bucket.blob(blob_name)
    blob.upload_from_string(data)
    print(f'Successfully uploaded file to GCS: {blob_name}')

async def generate_and_upload_data(i, bucket, executor):
    day = fake.day_of_month()
    month = fake.month()
    year = fake.year()
    tenantId = random.choice(tenants)
    project = random.choice(projects)
    eventType = random.choice(events)
    eventId = f"{tenantId}_{id_generator(4)}"
    update_time = fake.date_time_this_year()
    
    evento = {
        "day": day,
        "month": month,
        "year": year,
        "tenantId": tenantId,
        "eventType": eventType,
        "eventId": eventId
    }
    
    fornecedor = {
        "Nome_Fazenda": fake.company(),
        "Proprietario": fake.name(),
        "CNPJ_CPF": id_generator(),
        "Endereco": fake.address(),
        "Telefone": fake.phone_number(),
        "Email": fake.email()
    }
    
    materia_prima = {
        "Tipo_MateriaPrima": "Leite",
        "Quantidade_Recebida": round(random.uniform(1000.0, 5000.0), 2),
        "Data_Recebimento": update_time.isoformat(),
        "Qualidade": json.dumps({"acidez": round(random.uniform(0.1, 0.2), 2), "bacterias": random.randint(200, 300)})
    }
    
    produto = {
        "Nome_Produto": "Leite Integral",
        "Categoria": "Laticínios",
        "Descricao": "Leite integral fresco.",
        "Unidade_Medida": "litro",
        "Preco_Unitario": round(random.uniform(2.0, 5.0), 2),
        "Prazo_Validade": random.randint(5, 30)
    }
    
    funcionario = {
        "Nome": fake.name(),
        "Cargo": "Operador",
        "Departamento": "Produção",
        "Data_Contratacao": update_time.isoformat(),
        "Salario": round(random.uniform(2000.0, 5000.0), 2),
        "Contato_Emergencia": fake.name() + ", " + fake.phone_number()
    }
    
    equipamento = {
        "id_equipamento": id_generator(7),
        "nome_equipamento": random.choice(equipamentos_laticinios),
        "data_aquisicao": update_time.isoformat()
    }
    
    data = {
        "evento": evento,
        "fornecedor": fornecedor,
        "materia_prima": materia_prima,
        "produto": produto,
        "funcionario": funcionario,
        "equipamento": equipamento
    }
    
    try:
        json_data = json.dumps(data)
        blob_name = f"{eventId}_.json"
        await asyncio.get_event_loop().run_in_executor(
            executor, upload_to_gcs, bucket, blob_name, json_data)
    except Exception as e:
        print(f'Error on iteration {i}: {e}')

async def main():
    n_rows = 5  # Number of rows
    
    credentials = service_account.Credentials.from_service_account_file(gcp_credentials_path)
    storage_client = storage.Client(credentials=credentials)
    bucket = storage_client.bucket(gcp_bucket_name)
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        tasks = [asyncio.ensure_future(generate_and_upload_data(i, bucket, executor)) for i in range(n_rows)]
        await asyncio.gather(*tasks)

# Run the main coroutine
if __name__ == "__main__":
    before = datetime.now()
    print("Start time:", before)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    after = datetime.now()
    print("End time:", after)
    print("Elapsed time:", after - before)

Start time: 2024-10-19 15:43:10.967580
Successfully uploaded file to GCS: isnf_1mor_.json
Successfully uploaded file to GCS: s5b6_hlhl_.json
Successfully uploaded file to GCS: ohre_xwn6_.json
Successfully uploaded file to GCS: or1g_pu2j_.json
Successfully uploaded file to GCS: tdjy_xh60_.json
End time: 2024-10-19 15:43:12.199885
Elapsed time: 0:00:01.232305


In [58]:
import asyncio
import pandas as pd
import random
import string
from faker import Faker
from datetime import datetime
import nest_asyncio
from google.cloud import storage
from google.oauth2 import service_account
from concurrent.futures import ThreadPoolExecutor
import pyarrow as pa
import pyarrow.parquet as pq
import json
import io

# Helper function to generate random IDs
def id_generator(size=11, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(size))

# Generate project IDs
projects = [f"{id_generator(4)}" for _ in range(4)]

# Generate tenant IDs
Faker.seed(100)
fake = Faker()
tenants = [id_generator(4) for _ in range(10)]

# GCP configuration
gcp_bucket_name = "myfirstproject-inputbucket"
gcp_credentials_path = "double-balm-438113-f5-53067ace2784.json"  # Atualize com o caminho correto

nest_asyncio.apply()

equipamentos_produtos = {
    "Tanques de recepção e armazenamento de leite cru": {
        "produto": "Leite Integral",
        "categoria": "Laticínios",
        "descricao": "Leite integral fresco.",
        "unidade_medida": "litro",
        "preco_unitario": 3.50
    },
    "Fermentadores para iogurte": {
        "produto": "Iogurte Natural",
        "categoria": "Laticínios",
        "descricao": "Iogurte natural fermentado.",
        "unidade_medida": "litro",
        "preco_unitario": 6.00
    },
    "Prensas para queijo": {
        "produto": "Queijo Mussarela",
        "categoria": "Laticínios",
        "descricao": "Queijo mussarela fresco.",
        "unidade_medida": "quilo",
        "preco_unitario": 20.00
    }}
events = ["unfreeze", "depressurize", "warm", "", "pressurize", "normalize"]
equipamento_status = ["ativo"] * 8 + ["quebrado", "alerta", "inativo", "manutenção"] * 2

departamentos = [
    "Gestão de Fornecedores",
    "Controle de Matéria-Prima",
    "producao",
    "Gestão de Produtos",
    "Gestão de Estoque",
    "Gestão de Funcionários",
    "Gestão de Equipamentos"]
def upload_to_gcs(bucket, blob_name, data):
    blob = bucket.blob(blob_name)
    blob.upload_from_string(data)
    print(f'Successfully uploaded file to GCS: {blob_name}')

async def generate_and_upload_data(i, bucket, executor):
    day = fake.day_of_month()
    month = fake.month()
    year = fake.year()
    tenantId = random.choice(tenants)
    project = random.choice(projects)
    eventType = random.choice(events)
    eventId = f"{tenantId}_{id_generator(4)}"
    update_time = fake.date_time_this_year()

    # Escolhendo o equipamento e seu produto associado
    equipamento_escolhido = random.choice(list(equipamentos_produtos.keys()))
    produto_info = equipamentos_produtos[equipamento_escolhido]
    status = random.choice(equipamento_status)
    data = {
        
        # Informações sobre o equipamento
        "id_equipamento": [id_generator(7)],
        "nome_equipamento": [equipamento_escolhido],
        "data_aquisicao": [update_time],
        "eqp_status" : [status],
        "eventType": ['NULL' if status in ["quebrado", "inativo"] else random.choice(events)],
        
        "day": [day],
        "month": [month],
        "year": [year],
        "tenantId": [tenantId],
        "eventId": [eventId],
        "Nome_Fazenda": [fake.company()],
        "Proprietario": [fake.name()],
        "CNPJ_CPF": [id_generator()],
        "Endereco": [fake.address()],
        "Telefone": [fake.phone_number()],
        "Email": [fake.email()],
        
        #materia prima
        "Tipo_MateriaPrima": ["Leite"],
        "Quantidade_Recebida": [round(random.uniform(1000.0, 5000.0), 2)],
        "Data_Recebimento": [update_time],
        "Qualidade": [json.dumps({"acidez": round(random.uniform(0.1, 0.2), 2), "bacterias": random.randint(200, 300)})],
        
        # Informações do produto relacionadas ao equipamento
        "Nome_Produto": [produto_info["produto"]],
        "Categoria": [produto_info["categoria"]],
        "Descricao": [produto_info["descricao"]],
        "Unidade_Medida": [produto_info["unidade_medida"]],
        "Preco_Unitario": [produto_info["preco_unitario"]],
        "Prazo_Validade": [random.randint(5, 30)],
        
        #estoque
        
        "quantidade_estoque": [round(random.uniform(0.0, 20000.0), 0)],

        # Informações funcionario
        "Nome_Funcionario": [fake.name()],
        "Cargo": ["Operador"],
        "Departamento": ["Produção"],
        "Data_Contratacao": [update_time],
        "Salario": [round(random.uniform(2000.0, 5000.0), 2)],
        "Contato_Emergencia": [fake.name() + ", " + fake.phone_number()],
        
        # setor
        
        "nome_setor": ["producao"],
        "id_setor": [1]

        
    }

    df = pd.DataFrame(data)
    table = pa.Table.from_pandas(df)
    
    try:
        buffer = io.BytesIO()
        pq.write_table(table, buffer)
        buffer.seek(0)
        blob_name = f"{eventId}_.parquet"
        await asyncio.get_event_loop().run_in_executor(
            executor, upload_to_gcs, bucket, blob_name, buffer.getvalue())
    except Exception as e:
        print(f'Error on iteration {i}: {e}')


async def main():
    n_rows = 50000  # Number of rows
    
    credentials = service_account.Credentials.from_service_account_file(gcp_credentials_path)
    storage_client = storage.Client(credentials=credentials)
    bucket = storage_client.bucket(gcp_bucket_name)
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        tasks = [asyncio.ensure_future(generate_and_upload_data(i, bucket, executor)) for i in range(n_rows)]
        await asyncio.gather(*tasks)

# Run the main coroutine
if __name__ == "__main__":
    before = datetime.now()
    print("Start time:", before)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    after = datetime.now()
    print("End time:", after)
    print("Elapsed time:", after - before)

Start time: 2024-10-19 22:29:09.858222
Successfully uploaded file to GCS: k1es_ali6_.parquet
Successfully uploaded file to GCS: bwfg_qqzy_.parquet
Successfully uploaded file to GCS: n4vi_3c8d_.parquet
Successfully uploaded file to GCS: f02z_xnn3_.parquet
Successfully uploaded file to GCS: bwfg_uqtt_.parquet
Successfully uploaded file to GCS: idof_comu_.parquet
Successfully uploaded file to GCS: hxj7_ykap_.parquet
Successfully uploaded file to GCS: hxj7_4zlv_.parquet
Successfully uploaded file to GCS: hxj7_hsfk_.parquet
Successfully uploaded file to GCS: ou4z_vx7d_.parquet
Successfully uploaded file to GCS: hxj7_oyzj_.parquet
Successfully uploaded file to GCS: bwfg_obmz_.parquet
Successfully uploaded file to GCS: idof_3kpy_.parquet
Successfully uploaded file to GCS: nuj8_p5zi_.parquet
Successfully uploaded file to GCS: f02z_qzre_.parquet
Successfully uploaded file to GCS: nuj8_ru1u_.parquet
Successfully uploaded file to GCS: hxj7_i5u9_.parquet
Successfully uploaded file to GCS: k1es_phv

In [11]:
type(fake.year())

str

In [None]:
fake.day_of_month()

In [15]:
fake.name() + ", " + fake.phone_number()

'Mackenzie Taylor, 001-598-214-2199'

In [34]:
equipamento_status =  ["quebrado", "alerta",'ativo', 'inativo', 'manutenção']
status = random.choice(equipamento_status)
events = ["unfreeze", "depressurize", "warm", "", "pressurize", "normalize"]

ex_data = {
    "eventType": 'NULL' if status in ["quebrado", "inativo"] else random.choice(events),
    "status": status
}

print(ex_data)

{'eventType': 'depressurize', 'status': 'alerta'}


In [43]:

# Define the possible statuses with weighted occurrences
equipamento_status = ["ativo"] * 8 + ["quebrado", "alerta", "inativo", "manutenção"] * 2
status = random.choice(equipamento_status)
events = ["unfreeze", "depressurize", "warm", "", "pressurize", "normalize"]

# Use a conditional expression to set the eventType
ex_data = {
    "eventType": 'NULL' if status in ["quebrado", "inativo"] else random.choice(events),
    "status": status
}

print(ex_data)

{'eventType': 'warm', 'status': 'ativo'}


In [44]:
equipamento_status

['ativo',
 'ativo',
 'ativo',
 'ativo',
 'ativo',
 'ativo',
 'ativo',
 'ativo',
 'quebrado',
 'alerta',
 'inativo',
 'manutenção',
 'quebrado',
 'alerta',
 'inativo',
 'manutenção']

In [51]:
df = pd.read_parquet("ljk9_wq8d_.parquet")

In [53]:
print(df)

  id_equipamento            nome_equipamento             data_aquisicao  \
0        wnafrbp  Fermentadores para iogurte 2024-07-25 14:28:10.703293   

  eqp_status   eventType day month  year tenantId    eventId  ...  \
0     alerta  pressurize  25    12  2012     ljk9  ljk9_wq8d  ...   

  Prazo_Validade quantidade_estoque Nome_Funcionario     Cargo Departamento  \
0             26            13927.0    Jerome Little  Operador     Produção   

            Data_Contratacao Salario        Contato_Emergencia nome_setor  \
0 2024-07-25 14:28:10.703293  3423.6  Beth Spencer, 7937984958   producao   

  id_setor  
0        1  

[1 rows x 35 columns]


In [54]:
table = pa.Table.from_pandas(df)


In [57]:
table.schema

id_equipamento: string
nome_equipamento: string
data_aquisicao: timestamp[ns]
eqp_status: string
eventType: string
day: string
month: string
year: string
tenantId: string
eventId: string
Nome_Fazenda: string
Proprietario: string
CNPJ_CPF: string
Endereco: string
Telefone: string
Email: string
Tipo_MateriaPrima: string
Quantidade_Recebida: double
Data_Recebimento: timestamp[ns]
Qualidade: string
Nome_Produto: string
Categoria: string
Descricao: string
Unidade_Medida: string
Preco_Unitario: double
Prazo_Validade: int64
quantidade_estoque: double
Nome_Funcionario: string
Cargo: string
Departamento: string
Data_Contratacao: timestamp[ns]
Salario: double
Contato_Emergencia: string
nome_setor: string
id_setor: int64
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 4556