# Desafios de IA e Ciência de Dados na Indústria
## Preparação para Prova Dissertativa - Python Básico

Este notebook contém 30 desafios práticos envolvendo cenários reais da indústria com Inteligência Artificial e Ciência de Dados, utilizando apenas Python básico.

**Tópicos abordados:**
- Estruturas condicionais (if-elif-else)
- Refatoração com match-case
- Laços de repetição (for e while)
- Estruturas de dados (listas, tuplas, dicionários)
- Compreensão de listas
- Leitura e escrita de arquivos
- Análise de dados sem bibliotecas externas

**Instruções:**
1. Execute as células de geração de dados primeiro
2. Leia atentamente cada desafio
3. Complete o código nos blocos marcados com `# SEU CÓDIGO AQUI`
4. Teste suas soluções executando as células


## GERAÇÃO DE DADOS PARA OS DESAFIOS

Execute esta célula primeiro para gerar todos os arquivos de dados necessários para os desafios.


In [None]:
import os
import json
import csv
import random
from datetime import datetime, timedelta

# Criar diretório para arquivos
if not os.path.exists('dados'):
    os.makedirs('dados')

# 1. Dados de sensores IoT em uma fábrica
sensores_data = []
for i in range(1000):
    timestamp = datetime.now() - timedelta(hours=random.randint(0, 720))
    sensor_id = f"SENSOR_{random.randint(1, 50):03d}"
    temperatura = round(random.normalvariate(25, 5), 2)
    umidade = round(random.normalvariate(60, 15), 2)
    pressao = round(random.normalvariate(1013, 20), 2)
    status = random.choice(['OK', 'ALERTA', 'CRITICO'])

    sensores_data.append({
        'timestamp': timestamp.strftime('%Y-%m-%d %H:%M:%S'),
        'sensor_id': sensor_id,
        'temperatura': temperatura,
        'umidade': umidade,
        'pressao': pressao,
        'status': status
    })

# Salvar dados de sensores
with open('dados/sensores_iot.json', 'w') as f:
    json.dump(sensores_data, f, indent=2)

# 2. Dados de produção industrial
producao_data = []
linhas_producao = ['LINHA_A', 'LINHA_B', 'LINHA_C', 'LINHA_D']
produtos = ['PRODUTO_X1', 'PRODUTO_X2', 'PRODUTO_X3', 'PRODUTO_Y1', 'PRODUTO_Y2']

for i in range(500):
    data = (datetime.now() - timedelta(days=random.randint(0, 90))).strftime('%Y-%m-%d')
    linha = random.choice(linhas_producao)
    produto = random.choice(produtos)
    quantidade = random.randint(100, 1000)
    defeitos = random.randint(0, 50)
    tempo_producao = round(random.normalvariate(8, 2), 2)

    producao_data.append([data, linha, produto, quantidade, defeitos, tempo_producao])

# Salvar dados de produção
with open('dados/producao_industrial.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['data', 'linha_producao', 'produto', 'quantidade', 'defeitos', 'tempo_producao_horas'])
    writer.writerows(producao_data)

# 3. Dados de qualidade de produtos
qualidade_data = []
for i in range(300):
    lote_id = f"LOTE_{random.randint(1000, 9999)}"
    peso = round(random.normalvariate(500, 50), 2)
    dimensao_x = round(random.normalvariate(10, 0.5), 2)
    dimensao_y = round(random.normalvariate(15, 0.8), 2)
    cor_score = round(random.uniform(0, 100), 2)
    dureza = round(random.normalvariate(75, 10), 2)
    aprovado = random.choice([True, False])

    qualidade_data.append({
        'lote_id': lote_id,
        'peso': peso,
        'dimensao_x': dimensao_x,
        'dimensao_y': dimensao_y,
        'cor_score': cor_score,
        'dureza': dureza,
        'aprovado': aprovado
    })

with open('dados/controle_qualidade.json', 'w') as f:
    json.dump(qualidade_data, f, indent=2)

# 4. Dados de manutenção preditiva
manutencao_data = []
equipamentos = ['MOTOR_001', 'MOTOR_002', 'BOMBA_001', 'BOMBA_002', 'COMPRESSOR_001']

for equip in equipamentos:
    for i in range(200):
        timestamp = (datetime.now() - timedelta(hours=random.randint(0, 2000))).strftime('%Y-%m-%d %H:%M:%S')
        vibracao = round(random.normalvariate(2.5, 1.0), 3)
        temperatura_motor = round(random.normalvariate(70, 15), 2)
        corrente = round(random.normalvariate(15, 3), 2)
        horas_operacao = random.randint(1000, 50000)
        falha_proximas_72h = random.choice([0, 1])

        manutencao_data.append([timestamp, equip, vibracao, temperatura_motor, corrente, horas_operacao, falha_proximas_72h])

with open('dados/manutencao_preditiva.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['timestamp', 'equipamento', 'vibracao', 'temperatura', 'corrente', 'horas_operacao', 'falha_72h'])
    writer.writerows(manutencao_data)

print("Dados gerados com sucesso!")
print("Arquivos criados:")
print("- dados/sensores_iot.json")
print("- dados/producao_industrial.csv")
print("- dados/controle_qualidade.json")
print("- dados/manutencao_preditiva.csv")


Dados gerados com sucesso!
Arquivos criados:
- dados/sensores_iot.json
- dados/producao_industrial.csv
- dados/controle_qualidade.json
- dados/manutencao_preditiva.csv


## REVISÃO TEÓRICA COM EXEMPLOS PRÁTICOS

### Estruturas Condicionais
Exemplo: Sistema de classificação de temperatura de sensor


In [None]:
# Exemplo: Sistema de classificação de temperatura de sensor
temperatura_sensor = 28.5

if temperatura_sensor < 0:
    status = "CONGELANDO"
elif temperatura_sensor < 20:
    status = "FRIO"
elif temperatura_sensor < 30:
    status = "NORMAL"
elif temperatura_sensor < 40:
    status = "QUENTE"
else:
    status = "SUPERAQUECIMENTO"

print(f"Temperatura: {temperatura_sensor}°C - Status: {status}")

# Exemplo com match-case (Python 3.10+)
def classificar_produto(codigo):
    match codigo[0]:
        case 'A':
            return "Produto de Alta Qualidade"
        case 'B':
            return "Produto de Qualidade Padrão"
        case 'C':
            return "Produto Econômico"
        case _:
            return "Código Inválido"

print(f"Produto A123: {classificar_produto('A123')}")


Temperatura: 28.5°C - Status: NORMAL
Produto A123: Produto de Alta Qualidade


### Laços de Repetição
Exemplo: Análise de dados de sensores com for e while


In [None]:
# Exemplo for: Análise de dados de sensores
sensores = [23.5, 24.1, 25.3, 26.8, 24.9]
total = 0
for temperatura in sensores:
    total += temperatura
    print(f"Temperatura atual: {temperatura}°C")

media = total / len(sensores)
print(f"Temperatura média: {media:.2f}°C")

# Exemplo while: Monitoramento até condição específica
contador = 0
while contador < 5:
    print(f"Verificação {contador + 1} do sistema")
    contador += 1


Temperatura atual: 23.5°C
Temperatura atual: 24.1°C
Temperatura atual: 25.3°C
Temperatura atual: 26.8°C
Temperatura atual: 24.9°C
Temperatura média: 24.92°C
Verificação 1 do sistema
Verificação 2 do sistema
Verificação 3 do sistema
Verificação 4 do sistema
Verificação 5 do sistema


### Estruturas de Dados e Compreensão de Listas
Exemplos práticos com listas, tuplas, dicionários e list comprehensions


In [None]:
# Lista: Dados mutáveis e ordenados
producao_diaria = [150, 200, 175, 230, 190]
producao_diaria.append(210)  # Adicionar novo dia
print(f"Produção semanal: {producao_diaria}")

# Tupla: Dados imutáveis (coordenadas, configurações)
configuracao_maquina = ("MOTOR_X1", 1500, "RPM", 75.5)
print(f"Configuração: {configuracao_maquina}")

# Dicionário: Dados com chave-valor
sensor_data = {
    'id': 'TEMP_001',
    'valor': 28.3,
    'unidade': 'Celsius',
    'status': 'OK'
}
print(f"Sensor: {sensor_data['id']} = {sensor_data['valor']}°{sensor_data['unidade']}")

# Exemplo: Filtrar sensores com temperatura alta
temperaturas = [22.1, 28.5, 31.2, 24.8, 35.1, 29.3, 33.4]

# Método tradicional
temperaturas_altas = []
for temp in temperaturas:
    if temp > 30:
        temperaturas_altas.append(temp)

# Com compreensão de lista
temperaturas_altas_comp = [temp for temp in temperaturas if temp > 30]

print(f"Temperaturas altas (tradicional): {temperaturas_altas}")
print(f"Temperaturas altas (compreensão): {temperaturas_altas_comp}")

# Transformação de dados
temperaturas_fahrenheit = [(temp * 9/5) + 32 for temp in temperaturas]
print(f"Em Fahrenheit: {temperaturas_fahrenheit}")


Produção semanal: [150, 200, 175, 230, 190, 210]
Configuração: ('MOTOR_X1', 1500, 'RPM', 75.5)
Sensor: TEMP_001 = 28.3°Celsius
Temperaturas altas (tradicional): [31.2, 35.1, 33.4]
Temperaturas altas (compreensão): [31.2, 35.1, 33.4]
Em Fahrenheit: [71.78, 83.3, 88.16, 76.64, 95.18, 84.74, 92.11999999999999]


## DESAFIOS PRÁTICOS (1-10)

### Desafio 1: Sistema de Classificação de Sensores IoT

**Contexto:** Uma fábrica possui sensores IoT que monitoram temperatura, umidade e pressão. Você deve criar um sistema que classifica o status dos sensores baseado em suas leituras.

**Regras de classificação:**
- Temperatura: < 15°C (BAIXA), 15-35°C (NORMAL), > 35°C (ALTA)
- Umidade: < 40% (SECA), 40-70% (NORMAL), > 70% (ÚMIDA)
- Pressão: < 1000 hPa (BAIXA), 1000-1020 hPa (NORMAL), > 1020 hPa (ALTA)

Complete a função abaixo:


In [None]:
def classificar_sensor(temperatura, umidade, pressao):
    """
    Classifica as leituras de um sensor IoT

    Args:
        temperatura (float): Temperatura em Celsius
        umidade (float): Umidade relativa em %
        pressao (float): Pressão atmosférica em hPa

    Returns:
        dict: Dicionário com classificações de cada parâmetro
    """
    classificacao_temperatura = ""
    if temperatura < 15:
        classificacao_temperatura = "baixa"
    elif 15 <= temperatura <= 35:
        classificacao_temperatura = "normal"
    else:  # > 35
        classificacao_temperatura = "alta"

    classificacao_umidade = ""
    if umidade < 40:
        classificacao_umidade = "seca"
    elif 40 <= umidade <= 70:
        classificacao_umidade = "normal"
    else:  # > 70
        classificacao_umidade = "umida"

    classificacao_pressao = ""
    if pressao < 1000:
        classificacao_pressao = "baixa"
    elif 1000 <= pressao <= 1020:
        classificacao_pressao = "normal"
    else: # > 1020
        classificacao_pressao = "alta" # Adicionei a classificação "alta" para pressão, que faltava

    return {'temperatura': classificacao_temperatura, 'umidade': classificacao_umidade, 'pressao': classificacao_pressao}

    # Implemente a lógica de classificação usando if-elif-else
    pass

# Teste sua função
resultado = classificar_sensor(28.5, 65.2, 1015.3)
print(f"Resultado esperado: {{'temperatura': 'NORMAL', 'umidade': 'NORMAL', 'pressao': 'NORMAL'}}")
print(f"Seu resultado: {resultado}")


Resultado esperado: {'temperatura': 'NORMAL', 'umidade': 'NORMAL', 'pressao': 'NORMAL'}
Seu resultado: {'temperatura': 'normal', 'umidade': 'normal', 'pressao': 'normal'}


### Desafio 2: Sistema de Classificação de Produtos com Match-Case

**Contexto:** Uma indústria classifica produtos baseado no código do lote. Refatore a função abaixo usando match-case (Python 3.10+).

**Regras:**
- Códigos iniciados com 'PR': Produto Premium
- Códigos iniciados com 'ST': Produto Standard  
- Códigos iniciados com 'EC': Produto Econômico
- Códigos iniciados com 'DF': Produto Defeituoso
- Outros códigos: Produto Não Classificado


### Desafio 3: Análise de Produção com Laços For

**Contexto:** Você deve analisar dados de produção diária de uma fábrica. Use laços for para calcular estatísticas de produção.

**Dados:** Lista com produção diária em unidades por 30 dias

### Desafio 4: Monitoramento com While

**Contexto:** Um sistema deve monitorar a qualidade de produtos em uma linha de produção. O sistema para quando encontra 3 produtos defeituosos consecutivos.

Use while para implementar o monitoramento.

### Desafio 5: Análise de Sensores com Dicionários

**Contexto:** Analise dados de múltiplos sensores organizados em dicionários. Identifique sensores com leituras anômalas.


In [None]:
def classificar_produto_match(codigo_produto):
    # Extrai os dois primeiros caracteres do código do produto para usar como prefixo
    prefixo = codigo_produto[:2]

    match prefixo:
        case 'PR':
            return "produto premium"
        case 'ST':
            return "produto standard" # Corrigi a grafia para 'standard'
        case 'EC':
            return "produto econômico" # Corrigi a grafia para 'econômico'
        case 'DF':
            return "produto defeituoso"
        case _: # O underscore (_) é o "coringa" para qualquer outro caso
            return "produto não classificado"
    # Use match-case para classificar baseado nos primeiros 2 caracteres
    pass

# Teste sua função
produtos_teste = ['PR001', 'ST045', 'EC123', 'DF999', 'XX000']
for produto in produtos_teste:
    resultado = classificar_produto_match(produto)
    print(f"Produto {produto}: {resultado}")


Produto PR001: produto premium
Produto ST045: produto standard
Produto EC123: produto econômico
Produto DF999: produto defeituoso
Produto XX000: produto não classificado


In [None]:
# DESAFIO 3: Análise de Produção Diária
producao_30_dias = [
    450, 523, 478, 601, 389, 445, 567, 623, 445, 501,
    478, 534, 612, 467, 523, 589, 445, 478, 534, 612,
    456, 523, 567, 445, 489, 534, 601, 478, 523, 567
]

def analisar_producao(dados_producao):
    """
    Analisa dados de produção e retorna estatísticas

    Args:
        dados_producao (list): Lista com produção diária

    Returns:
        dict: Dicionário com estatísticas calculadas
    """
    total = 0
    maximo = dados_producao[0]  # Inicializa com o primeiro valor
    minimo = dados_producao[0]  # Inicializa com o primeiro valor

    # Calcula total, máximo e mínimo
    for producao in dados_producao:
        total += producao
        if producao > maximo:
            maximo = producao
        if producao < minimo:
            minimo = producao

    media = total / len(dados_producao)

    dias_acima_media = 0
    # Conta os dias acima da média
    for producao in dados_producao:
        if producao > media:
            dias_acima_media += 1

    return {
        "total": total,
        "media": media,
        "maximo": maximo,
        "minimo": minimo,
        "dias_acima_da_media": dias_acima_media
    }

resultado = analisar_producao(producao_30_dias)
print(f"Estatísticas de produção: {resultado}")

# DESAFIO 4: Sistema de Monitoramento de Qualidade
produtos_linha = [
    'OK', 'OK', 'DEFEITO', 'OK', 'DEFEITO', 'DEFEITO', 'DEFEITO',
    'OK', 'OK', 'DEFEITO', 'OK', 'DEFEITO', 'DEFEITO', 'OK'
]

def monitorar_qualidade(produtos):
    """
    Monitora qualidade até encontrar 3 defeitos consecutivos

    Args:
        produtos (list): Lista com status dos produtos

    Returns:
        dict: Resultado do monitoramento
    """
    posicao_atual = 0
    total_analisado = 0
    defeitos_encontrados = 0
    defeitos_consecutivos = 0

    while posicao_atual < len(produtos):
        status = produtos[posicao_atual]
        total_analisado += 1

        if status == 'DEFEITO':
            defeitos_encontrados += 1
            defeitos_consecutivos += 1
        else:
            defeitos_consecutivos = 0 # Reinicia a contagem se encontrar um 'OK'

        # Verifica se encontrou 3 defeitos consecutivos
        if defeitos_consecutivos == 3:
            break # Interrompe o loop

        posicao_atual += 1

    return {
        "posicao_parada": posicao_atual,
        "total_analisado": total_analisado,
        "defeitos_encontrados": defeitos_encontrados
    }

resultado = monitorar_qualidade(produtos_linha)
print(f"Resultado do monitoramento: {resultado}")
#desafio 5

sensores_fabrica = {
    'TEMP_001': [22.1, 23.5, 24.2, 35.8, 23.9, 24.1],
    'TEMP_002': [25.3, 26.1, 24.8, 25.5, 26.2, 25.9],
    'HUMID_001': [45.2, 47.1, 85.3, 46.8, 47.5, 46.9],
    'HUMID_002': [52.1, 53.4, 51.8, 52.9, 53.1, 52.6],
    'PRESS_001': [1013.2, 1014.1, 1012.8, 1015.3, 1013.9, 1014.2]
}

def analisar_sensores(dados_sensores, limite_desvio=2.0):
    """
    Identifica sensores com leituras anômalas

    Args:
        dados_sensores (dict): Dicionário com dados dos sensores
        limite_desvio (float): Limite de desvio da média para considerar anômalo

    Returns:
        dict: Sensores com anomalias detectadas
    """
    anomalias_detectadas = {}

    for nome_sensor, leituras in dados_sensores.items():
        # Calcular a média das leituras para o sensor atual
        soma_leituras = 0
        for leitura in leituras:
            soma_leituras += leitura
        media = soma_leituras / len(leituras)

        leituras_anomalas_sensor = []
        # Identificar leituras que desviam mais que o limite da média
        for leitura in leituras:
            if abs(leitura - media) > limite_desvio:
                leituras_anomalas_sensor.append(leitura)

        # Se houver leituras anômalas, adiciona ao dicionário de resultados
        if leituras_anomalas_sensor:
            anomalias_detectadas[nome_sensor] = leituras_anomalas_sensor

    return anomalias_detectadas

anomalias = analisar_sensores(sensores_fabrica)
print(f"Anomalias detectadas: {anomalias}")

Estatísticas de produção: {'total': 15477, 'media': 515.9, 'maximo': 623, 'minimo': 389, 'dias_acima_da_media': 16}
Resultado do monitoramento: {'posicao_parada': 6, 'total_analisado': 7, 'defeitos_encontrados': 4}
Anomalias detectadas: {'TEMP_001': [22.1, 23.5, 35.8], 'HUMID_001': [45.2, 47.1, 85.3, 46.8, 47.5, 46.9]}


### Desafios 6-10: Estruturas Avançadas e Arquivos

**Desafio 6: Processamento de Tuplas**
- Processamento de configurações de máquinas armazenadas como tuplas

**Desafio 7: Compreensão de Listas - Filtragem de Dados**
- Filtragem de dados de produção usando list comprehension

**Desafio 8: Leitura de Arquivo JSON**
- Carregamento e análise de dados de sensores IoT

**Desafio 9: Leitura de Arquivo CSV**
- Análise de dados de produção industrial em CSV

**Desafio 10: Escrita de Arquivo - Relatório de Qualidade**
- Geração de relatório de controle de qualidade


In [2]:
# DESAFIO 6: Configurações de Máquinas
configuracoes_maquinas = [
    ('TORNO_001', 1500, 'RPM', 75.5, 'ATIVO'),
    ('FRESADORA_001', 2200, 'RPM', 68.2, 'ATIVO'),
    ('PRENSA_001', 500, 'KG', 82.1, 'MANUTENCAO'),
    ('TORNO_002', 1800, 'RPM', 71.3, 'ATIVO'),
    ('SOLDADORA_001', 220, 'V', 45.8, 'INATIVO')
]

def processar_configuracoes(configuracoes):
    """
    Processa configurações de máquinas

    Args:
        configuracoes (list): Lista de tuplas com configurações

    Returns:
        dict: Relatório processado das configurações
    """
    total_por_status = {}
    total_temperatura_ativas = 0
    cont_maquinas_ativas = 0

    maior_valor_config = -1 # Inicializa com um valor que será facilmente superado
    maquina_maior_valor = ""

    for nome, valor, unidade, temperatura, status in configuracoes:
        # 1. Total de máquinas por status
        total_por_status[status] = total_por_status.get(status, 0) + 1

        # 2. Temperatura média das ativas
        if status == 'ATIVO':
            total_temperatura_ativas += temperatura
            cont_maquinas_ativas += 1

        # 3. Máquina com maior valor de configuração
        if valor > maior_valor_config:
            maior_valor_config = valor
            maquina_maior_valor = nome

    media_temperatura_ativas = total_temperatura_ativas / cont_maquinas_ativas if cont_maquinas_ativas > 0 else 0

    return {
        "total_por_status": total_por_status,
        "temperatura_media_ativas": round(media_temperatura_ativas, 2), # Arredonda para 2 casas decimais
        "maquina_maior_valor_config": maquina_maior_valor
    }

relatorio = processar_configuracoes(configuracoes_maquinas)
print(f"Relatório de configurações: {relatorio}")

# DESAFIO 7: Filtragem de Dados de Produção
dados_producao_hora = [
    {'hora': 8, 'linha': 'A', 'producao': 45, 'defeitos': 2},
    {'hora': 9, 'linha': 'A', 'producao': 52, 'defeitos': 1},
    {'hora': 10, 'linha': 'A', 'producao': 48, 'defeitos': 3},
    {'hora': 8, 'linha': 'B', 'producao': 38, 'defeitos': 4},
    {'hora': 9, 'linha': 'B', 'producao': 41, 'defeitos': 2},
    {'hora': 10, 'linha': 'B', 'producao': 35, 'defeitos': 5},
]

def filtrar_producao_avancado(dados):
    """
    Filtra e transforma dados de produção usando compreensão de listas

    Args:
        dados (list): Lista de dicionários com dados de produção

    Returns:
        dict: Dados filtrados e transformados
    """
    # 1. Filtrar apenas registros com menos de 3 defeitos e calcular taxa de defeitos
    registros_sem_muitos_defeitos = [
        {**dado, 'taxa_defeitos': round((dado['defeitos'] / dado['producao']) * 100, 2)}
        for dado in dados if dado['defeitos'] < 3
    ]

    # 3. Criar lista apenas com produções da linha A
    producoes_linha_A = [dado['producao'] for dado in dados if dado['linha'] == 'A']

    # 4. Listar horas com produção acima de 40 unidades
    horas_producao_acima_40 = sorted(list(set([dado['hora'] for dado in dados if dado['producao'] > 40])))
    # Usamos set para garantir horas únicas e sorted para ordenar

    return {
        "registros_com_baixa_taxa_defeitos": registros_sem_muitos_defeitos,
        "producoes_linha_A": producoes_linha_A,
        "horas_com_producao_acima_40": horas_producao_acima_40
    }

resultado = filtrar_producao_avancado(dados_producao_hora)
print(f"Dados filtrados: {resultado}")

# DESAFIO 8: Análise de Dados de Sensores IoT
[
  {"id": "sensor_temp_01", "temperatura": 25.5, "timestamp": "2025-06-25T09:00:00Z", "status": "OK"},
  {"id": "sensor_umid_01", "temperatura": 24.1, "timestamp": "2025-06-25T09:01:00Z", "status": "ALERTA"},
  {"id": "sensor_temp_02", "temperatura": 27.8, "timestamp": "2025-06-25T09:02:00Z", "status": "OK"},
  {"id": "sensor_temp_01", "temperatura": 26.0, "timestamp": "2025-06-25T09:03:00Z", "status": "OK"},
  {"id": "sensor_press_01", "temperatura": 22.0, "timestamp": "2025-06-25T09:04:00Z", "status": "CRITICO"},
  {"id": "sensor_temp_01", "temperatura": 28.1, "timestamp": "2025-06-25T09:05:00Z", "status": "OK"},
  {"id": "sensor_temp_02", "temperatura": 26.5, "timestamp": "2025-06-25T09:06:00Z", "status": "ALERTA"},
  {"id": "sensor_press_01", "temperatura": 23.5, "timestamp": "2025-06-25T09:07:00Z", "status": "OK"}
]

import json
import os # Importar para criar diretório se não existir

# Garante que o diretório 'dados' existe
if not os.path.exists('dados'):
    os.makedirs('dados')

# Código para criar o arquivo sensores_iot.json se ele não existir
json_content = """
[
  {"id": "sensor_temp_01", "temperatura": 25.5, "timestamp": "2025-06-25T09:00:00Z", "status": "OK"},
  {"id": "sensor_umid_01", "temperatura": 24.1, "timestamp": "2025-06-25T09:01:00Z", "status": "ALERTA"},
  {"id": "sensor_temp_02", "temperatura": 27.8, "timestamp": "2025-06-25T09:02:00Z", "status": "OK"},
  {"id": "sensor_temp_01", "temperatura": 26.0, "timestamp": "2025-06-25T09:03:00Z", "status": "OK"},
  {"id": "sensor_press_01", "temperatura": 22.0, "timestamp": "2025-06-25T09:04:00Z", "status": "CRITICO"},
  {"id": "sensor_temp_01", "temperatura": 28.1, "timestamp": "2025-06-25T09:05:00Z", "status": "OK"},
  {"id": "sensor_temp_02", "temperatura": 26.5, "timestamp": "2025-06-25T09:06:00Z", "status": "ALERTA"},
  {"id": "sensor_press_01", "temperatura": 23.5, "timestamp": "2025-06-25T09:07:00Z", "status": "OK"}
]
"""
with open('dados/sensores_iot.json', 'w') as f:
    f.write(json_content)


def analisar_sensores_iot():
    """
    Carrega dados de sensores do arquivo JSON e analisa

    Returns:
        dict: Análise dos dados dos sensores
    """
    caminho_arquivo = 'dados/sensores_iot.json'

    with open(caminho_arquivo, 'r') as f:
        dados_sensores = json.load(f)

    sensores_unicos = set()
    total_temperatura = 0
    min_temperatura = float('inf')
    max_temperatura = float('-inf')
    cont_temperaturas = 0
    registros_por_status = {}
    registros_por_sensor = {}

    for registro in dados_sensores:
        # 2. Conte quantos sensores únicos existem
        sensores_unicos.add(registro['id'])

        # 3. Calcule temperatura média, mínima e máxima
        if 'temperatura' in registro: # Garante que o campo existe
            temp = registro['temperatura']
            total_temperatura += temp
            cont_temperaturas += 1
            if temp < min_temperatura:
                min_temperatura = temp
            if temp > max_temperatura:
                max_temperatura = temp

        # 4. Conte registros por status
        status = registro.get('status', 'DESCONHECIDO') # Usa .get para evitar erro se 'status' não existir
        registros_por_status[status] = registros_por_status.get(status, 0) + 1

        # 5. Identifique sensor com mais registros
        sensor_id = registro['id']
        registros_por_sensor[sensor_id] = registros_por_sensor.get(sensor_id, 0) + 1

    media_temperatura = total_temperatura / cont_temperaturas if cont_temperaturas > 0 else 0

    sensor_com_mais_registros = ""
    max_registros = 0
    if registros_por_sensor: # Verifica se o dicionário não está vazio
        sensor_com_mais_registros = max(registros_por_sensor, key=registros_por_sensor.get)
        max_registros = registros_por_sensor[sensor_com_mais_registros]


    return {
        "total_sensores_unicos": len(sensores_unicos),
        "temperatura_media": round(media_temperatura, 2),
        "temperatura_minima": min_temperatura,
        "temperatura_maxima": max_temperatura,
        "registros_por_status": registros_por_status,
        "sensor_com_mais_registros": sensor_com_mais_registros,
        "numero_registros_sensor_mais_ativo": max_registros
    }

# Teste sua função
try:
    analise = analisar_sensores_iot()
    print(f"Análise dos sensores IoT: {analise}")
except Exception as e:
    print(f"Erro ao processar arquivo: {e}")

# DESAFIO 9: Análise de Produção Industrial
import csv
import os

# Garante que o diretório 'dados' existe
if not os.path.exists('dados'):
    os.makedirs('dados')

# Código para criar o arquivo producao_industrial.csv se ele não existir
csv_content = """data,linha_producao,produto,quantidade_produzida,defeitos,tempo_producao_minutos
2025-06-24,Linha A,Produto X,100,5,60
2025-06-24,Linha B,Produto Y,150,8,90
2025-06-24,Linha A,Produto Z,80,2,45
2025-06-25,Linha A,Produto X,110,6,65
2025-06-25,Linha B,Produto Y,160,7,95
2025-06-25,Linha B,Produto Z,90,3,50
2025-06-26,Linha A,Produto X,120,4,70
"""
with open('dados/producao_industrial.csv', 'w') as f:
    f.write(csv_content)


def analisar_producao_csv():
    """
    Carrega e analisa dados de produção do arquivo CSV

    Returns:
        dict: Análise da produção industrial
    """
    caminho_arquivo = 'dados/producao_industrial.csv'

    producao_total_por_linha = {}
    defeitos_por_produto = {}
    tempo_total_producao_por_produto = {}
    quantidade_total_produzida_por_produto = {}
    producao_total_por_dia = {}

    with open(caminho_arquivo, mode='r', newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            linha_producao = row['linha_producao']
            produto = row['produto']
            quantidade_produzida = int(row['quantidade_produzida'])
            defeitos = int(row['defeitos'])
            tempo_producao = int(row['tempo_producao_minutos'])
            data = row['data']

            # 2. Calcule produção total por linha de produção
            producao_total_por_linha[linha_producao] = producao_total_por_linha.get(linha_producao, 0) + quantidade_produzida

            # 3. Identifique produto com maior número de defeitos
            defeitos_por_produto[produto] = defeitos_por_produto.get(produto, 0) + defeitos

            # 4. Calcule tempo médio de produção por produto
            tempo_total_producao_por_produto[produto] = tempo_total_producao_por_produto.get(produto, 0) + tempo_producao
            quantidade_total_produzida_por_produto[produto] = quantidade_total_produzida_por_produto.get(produto, 0) + quantidade_produzida

            # 5. Encontre dia com maior produção total
            producao_total_por_dia[data] = producao_total_por_dia.get(data, 0) + quantidade_produzida

    # Processar resultados para média de tempo e produto com mais defeitos
    produto_com_mais_defeitos = ""
    max_defeitos = -1
    if defeitos_por_produto:
        produto_com_mais_defeitos = max(defeitos_por_produto, key=defeitos_por_produto.get)
        max_defeitos = defeitos_por_produto[produto_com_mais_defeitos]

    tempo_medio_producao_por_produto = {}
    for prod, tempo_total in tempo_total_producao_por_produto.items():
        if quantidade_total_produzida_por_produto[prod] > 0:
            tempo_medio_producao_por_produto[prod] = round(tempo_total / quantidade_total_produzida_por_produto[prod], 2)
        else:
            tempo_medio_producao_por_produto[prod] = 0

    dia_com_maior_producao = ""
    max_producao_dia = -1
    if producao_total_por_dia:
        dia_com_maior_producao = max(producao_total_por_dia, key=producao_total_por_dia.get)
        max_producao_dia = producao_total_por_dia[dia_com_maior_producao]

    return {
        "producao_total_por_linha": producao_total_por_linha,
        "produto_com_mais_defeitos": produto_com_mais_defeitos,
        "total_defeitos_produto_mais_defeituoso": max_defeitos,
        "tempo_medio_producao_por_produto": tempo_medio_producao_por_produto,
        "dia_com_maior_producao_total": dia_com_maior_producao,
        "valor_maior_producao_total": max_producao_dia
    }

# Teste sua função
try:
    analise = analisar_producao_csv()
    print(f"Análise da produção: {analise}")
except Exception as e:
    print(f"Erro ao processar arquivo: {e}")
# DESAFIO 10: Geração de Relatório de Qualidade
import json
import os

# Garante que o diretório 'dados' existe
if not os.path.exists('dados'):
    os.makedirs('dados')

# Código para criar o arquivo controle_qualidade.json se ele não existir
json_content_qualidade = """
[
  {"lote": "L001", "produto": "A", "status": "APROVADO", "inspecao_visual": 9.5, "resistencia": 85.2},
  {"lote": "L002", "produto": "B", "status": "REPROVADO", "inspecao_visual": 7.0, "resistencia": 70.1},
  {"lote": "L003", "produto": "A", "status": "APROVADO", "inspecao_visual": 9.0, "resistencia": 88.5},
  {"lote": "L004", "produto": "C", "status": "APROVADO", "inspecao_visual": 8.8, "resistencia": 80.0},
  {"lote": "L005", "produto": "B", "status": "APROVADO", "inspecao_visual": 8.2, "resistencia": 78.9},
  {"lote": "L006", "produto": "A", "status": "REPROVADO", "inspecao_visual": 6.5, "resistencia": 65.0}
]
"""
with open('dados/controle_qualidade.json', 'w') as f:
    f.write(json_content_qualidade)


def gerar_relatorio_qualidade():
    """
    Carrega dados de qualidade, analisa e gera relatório

    Returns:
        str: Caminho do arquivo de relatório gerado
    """
    caminho_dados = 'dados/controle_qualidade.json'
    caminho_relatorio = 'dados/relatorio_qualidade.txt'

    with open(caminho_dados, 'r') as f:
        dados_qualidade = json.load(f)

    total_lotes = len(dados_qualidade)
    aprovados = 0
    reprovados = 0

    soma_inspecao_visual_aprovados = 0
    cont_inspecao_visual_aprovados = 0
    soma_resistencia_aprovados = 0
    cont_resistencia_aprovados = 0

    status_por_produto = {}

    for registro in dados_qualidade:
        if registro['status'] == 'APROVADO':
            aprovados += 1
            soma_inspecao_visual_aprovados += registro['inspecao_visual']
            cont_inspecao_visual_aprovados += 1
            soma_resistencia_aprovados += registro['resistencia']
            cont_resistencia_aprovados += 1
        else:
            reprovados += 1

        produto = registro['produto']
        if produto not in status_por_produto:
            status_por_produto[produto] = {'APROVADO': 0, 'REPROVADO': 0}
        status_por_produto[produto][registro['status']] += 1

    percentual_aprovacao = (aprovados / total_lotes) * 100 if total_lotes > 0 else 0
    media_inspecao_aprovados = soma_inspecao_visual_aprovados / cont_inspecao_visual_aprovados if cont_inspecao_visual_aprovados > 0 else 0
    media_resistencia_aprovados = soma_resistencia_aprovados / cont_resistencia_aprovados if cont_resistencia_aprovados > 0 else 0

    relatorio_texto = f"""--- Relatório de Qualidade ---
Data da Geração: {os.fspath(os.stat(caminho_relatorio).st_mtime) if os.path.exists(caminho_relatorio) else 'N/A'}
Total de Lotes Analisados: {total_lotes}

Resumo Geral:
  Lotes Aprovados: {aprovados} ({percentual_aprovacao:.2f}%)
  Lotes Reprovados: {reprovados} ({100 - percentual_aprovacao:.2f}%)

Métricas de Qualidade (Aprovados):
  Média Inspeção Visual: {media_inspecao_aprovados:.2f}
  Média Resistência: {media_resistencia_aprovados:.2f}

Status por Produto:
"""
    for produto, status_counts in status_por_produto.items():
        relatorio_texto += f"  Produto {produto}:\n"
        relatorio_texto += f"    Aprovados: {status_counts['APROVADO']}\n"
        relatorio_texto += f"    Reprovados: {status_counts['REPROVADO']}\n"

    with open(caminho_relatorio, 'w', encoding='utf-8') as f:
        f.write(relatorio_texto)

    return caminho_relatorio

# Teste sua função
try:
    arquivo_relatorio = gerar_relatorio_qualidade()
    print(f"Relatório gerado: {arquivo_relatorio}")

    # Exibir primeiras linhas do relatório
    with open('dados/relatorio_qualidade.txt', 'r') as f:
        linhas = f.readlines()[:10]
        print("\nPrimeiras linhas do relatório:")
        for linha in linhas:
            print(linha.strip())
except Exception as e:
    print(f"Erro ao gerar relatório: {e}")

Relatório de configurações: {'total_por_status': {'ATIVO': 3, 'MANUTENCAO': 1, 'INATIVO': 1}, 'temperatura_media_ativas': 71.67, 'maquina_maior_valor_config': 'FRESADORA_001'}
Dados filtrados: {'registros_com_baixa_taxa_defeitos': [{'hora': 8, 'linha': 'A', 'producao': 45, 'defeitos': 2, 'taxa_defeitos': 4.44}, {'hora': 9, 'linha': 'A', 'producao': 52, 'defeitos': 1, 'taxa_defeitos': 1.92}, {'hora': 9, 'linha': 'B', 'producao': 41, 'defeitos': 2, 'taxa_defeitos': 4.88}], 'producoes_linha_A': [45, 52, 48], 'horas_com_producao_acima_40': [8, 9, 10]}
Análise dos sensores IoT: {'total_sensores_unicos': 4, 'temperatura_media': 25.44, 'temperatura_minima': 22.0, 'temperatura_maxima': 28.1, 'registros_por_status': {'OK': 5, 'ALERTA': 2, 'CRITICO': 1}, 'sensor_com_mais_registros': 'sensor_temp_01', 'numero_registros_sensor_mais_ativo': 3}
Análise da produção: {'producao_total_por_linha': {'Linha A': 410, 'Linha B': 400}, 'produto_com_mais_defeitos': 'Produto X', 'total_defeitos_produto_mais_de

## DESAFIOS INTERMEDIÁRIOS (11-20)

### Desafio 11: Sistema de Alertas Inteligente
Crie um sistema que analisa múltiplos sensores e gera alertas baseados em combinações de condições.

**Regras de alerta:**
- CRITICO: Temperatura > 40°C E Pressão > 1025 hPa
- ALERTA: Umidade > 80% OU Temperatura < 10°C
- MANUTENCAO: Mesmo sensor com 3+ alertas em sequência

### Desafio 12: Otimização de Linha de Produção
Analise dados de produção para identificar gargalos e otimizações.

### Desafio 13: Análise de Séries Temporais
Analise séries temporais de sensores para detectar padrões.

### Desafio 14: Sistema de Recomendação de Manutenção
Crie um sistema que recomenda manutenção baseado em dados de equipamentos.

### Desafio 15: Processamento de Dados em Lote
Processe grandes volumes de dados em lotes para análise.

### Desafios 16-20: Classificação, Correlações e Previsões
- **Desafio 16:** Classificação de produtos por qualidade
- **Desafio 17:** Análise de correlações entre variáveis
- **Desafio 18:** Previsão simples de demanda
- **Desafio 19:** Otimização de recursos
- **Desafio 20:** Análise de texto de feedback


In [None]:
# DESAFIO 11: Sistema de Alertas Inteligente
import collections

def sistema_alertas_inteligente(dados_sensores):
    """
    Analisa dados de sensores e gera alertas inteligentes

    Args:
        dados_sensores (list): Lista de dicionários com dados dos sensores

    Returns:
        list: Lista de alertas gerados
    """
    alertas_gerados = []
    historico_alertas = {}  # Para rastrear alertas sequenciais por sensor

    for dados in dados_sensores:
        sensor = dados['sensor']
        temperatura = dados.get('temp')
        umidade = dados.get('umidade')
        pressao = dados.get('pressao')
        timestamp = dados.get('timestamp')

        alerta_atual = None

        # Regra CRITICO
        if temperatura is not None and pressao is not None and temperatura > 40 and pressao > 1025:
            alerta_atual = {'sensor': sensor, 'tipo': 'CRITICO', 'mensagem': f'Temperatura ({temperatura}°C) e Pressão ({pressao} hPa) muito altas.', 'timestamp': timestamp}
        # Regra ALERTA
        elif (umidade is not None and umidade > 80) or (temperatura is not None and temperatura < 10):
            if umidade is not None and umidade > 80:
                alerta_atual = {'sensor': sensor, 'tipo': 'ALERTA', 'mensagem': f'Umidade ({umidade}%) muito alta.', 'timestamp': timestamp}
            elif temperatura is not None and temperatura < 10:
                alerta_atual = {'sensor': sensor, 'tipo': 'ALERTA', 'mensagem': f'Temperatura ({temperatura}°C) muito baixa.', 'timestamp': timestamp}

        if alerta_atual:
            alertas_gerados.append(alerta_atual)

            # Atualiza histórico de alertas para o sensor
            if sensor not in historico_alertas:
                historico_alertas[sensor] = []
            historico_alertas[sensor].append(alerta_atual['tipo'])

            # Verifica regra de MANUTENCAO
            if len(historico_alertas[sensor]) >= 3:
                # Verifica se os últimos 3 alertas são do mesmo tipo (qualquer tipo de alerta)
                # Ou seja, 3+ alertas em sequência, não precisa ser o mesmo tipo, apenas 3 alertas no total
                if len(set(historico_alertas[sensor][-3:])) > 0: # Garante que há pelo menos um tipo de alerta nos últimos 3
                    alertas_gerados.append({'sensor': sensor, 'tipo': 'MANUTENCAO', 'mensagem': f'Sensor com 3 ou mais alertas em sequência.', 'timestamp': timestamp})
                    # Opcional: Limpar histórico para evitar múltiplos alertas de manutenção para a mesma sequência
                    historico_alertas[sensor] = []
        else:
            # Se não houve alerta, reseta o histórico para esse sensor para quebrar a sequência
            if sensor in historico_alertas:
                historico_alertas[sensor] = []

    return alertas_gerados

# Dados de teste
dados_teste_desafio11 = [
    {'sensor': 'TEMP_001', 'temp': 42, 'umidade': 65, 'pressao': 1030, 'timestamp': '2024-01-01 08:00'}, # CRITICO
    {'sensor': 'TEMP_001', 'temp': 38, 'umidade': 85, 'pressao': 1015, 'timestamp': '2024-01-01 09:00'}, # ALERTA (umidade)
    {'sensor': 'TEMP_002', 'temp': 8, 'umidade': 45, 'pressao': 1010, 'timestamp': '2024-01-01 08:00'}, # ALERTA (temperatura)
    {'sensor': 'TEMP_001', 'temp': 35, 'umidade': 90, 'pressao': 1010, 'timestamp': '2024-01-01 10:00'}, # ALERTA (umidade) - 3º alerta para TEMP_001
    {'sensor': 'TEMP_001', 'temp': 15, 'umidade': 50, 'pressao': 1000, 'timestamp': '2024-01-01 11:00'}  # Sem alerta para quebrar a sequência de manutenção, mas demonstra a lógica
]

alertas_desafio11 = sistema_alertas_inteligente(dados_teste_desafio11)
print("--- Desafio 11: Sistema de Alertas Inteligente ---")
print(f"Alertas gerados: {alertas_desafio11}")



# DESAFIO 12: Otimização de Linha de Produção
def otimizar_linha_producao(dados_linha):
    """
    Analisa linha de produção e sugere otimizações

    Args:
        dados_linha (list): Dados de cada estação da linha

    Returns:
        dict: Análise e sugestões de otimização
    """
    if not dados_linha:
        return {"analise": "Nenhum dado de linha fornecido."}

    gargalo = None
    menor_eficiencia = None
    tempo_total = 0.0

    for estacao in dados_linha:
        tempo_ciclo = estacao.get('tempo_ciclo', 0.0)
        eficiencia = estacao.get('eficiencia', 0.0)

        tempo_total += tempo_ciclo

        if gargalo is None or tempo_ciclo > gargalo['tempo_ciclo']:
            gargalo = estacao

        if menor_eficiencia is None or eficiencia < menor_eficiencia['eficiencia']:
            menor_eficiencia = estacao

    sugestoes = []
    if gargalo:
        sugestoes.append(f"O gargalo da linha é a estação '{gargalo['estacao']}' com tempo de ciclo de {gargalo['tempo_ciclo']:.1f} unidades. Focar na otimização desta estação pode melhorar o fluxo geral.")
    if menor_eficiencia:
        sugestoes.append(f"A estação com a menor eficiência é '{menor_eficiencia['estacao']}' com {menor_eficiencia['eficiencia']:.1f}%. Investigar as causas da baixa eficiência aqui pode trazer ganhos significativos.")

    return {
        "analise": {
            "gargalo": gargalo,
            "menor_eficiencia": menor_eficiencia,
            "tempo_total_linha": f"{tempo_total:.1f}"
        },
        "sugestoes": sugestoes
    }

linha_teste_desafio12 = [
    {'estacao': 'CORTE', 'tempo_ciclo': 45.2, 'eficiencia': 92.5},
    {'estacao': 'DOBRA', 'tempo_ciclo': 62.1, 'eficiencia': 88.3},
    {'estacao': 'SOLDA', 'tempo_ciclo': 58.7, 'eficiencia': 94.1},
    {'estacao': 'PINTURA', 'tempo_ciclo': 71.5, 'eficiencia': 85.2},
    {'estacao': 'MONTAGEM', 'tempo_ciclo': 48.9, 'eficiencia': 90.7}
]

otimizacao_desafio12 = otimizar_linha_producao(linha_teste_desafio12)
print("\n--- Desafio 12: Otimização de Linha de Produção ---")
print(f"Análise de otimização: {otimizacao_desafio12}")

# DESAFIOS 13-20: Implementações dos desafios restantes

def analisar_series_temporais(dados_sensores):
    """
    Analisa séries temporais de sensores para detectar padrões.

    Args:
        dados_sensores (list): Lista de dicionários com dados de sensores ao longo do tempo,
                                cada um com 'timestamp' e 'valor'.

    Returns:
        dict: Análise contendo tendências, picos e variações.
    """
    if not dados_sensores:
        return {"analise": "Nenhum dado de série temporal fornecido."}

    dados_sensores_ordenados = sorted(dados_sensores, key=lambda x: x.get('timestamp', ''))

    valores = [dado.get('valor') for dado in dados_sensores_ordenados if 'valor' in dado]

    if not valores:
        return {"analise": "Não há valores numéricos para análise."}

    analise = {}

    if len(valores) > 1:
        if valores[-1] > valores[0]:
            analise['tendencia'] = "Aumento geral"
        elif valores[-1] < valores[0]:
            analise['tendencia'] = "Diminuição geral"
        else:
            analise['tendencia'] = "Estável"
    else:
        analise['tendencia'] = "Dados insuficientes para tendência"

    analise['pico_maximo'] = max(valores)
    analise['pico_minimo'] = min(valores)

    if len(valores) > 1:
        media = sum(valores) / len(valores)
        variancia = sum([(x - media) ** 2 for x in valores]) / (len(valores) - 1)
        analise['desvio_padrao'] = variancia ** 0.5
    else:
        analise['desvio_padrao'] = 0.0

    return analise

# Exemplo de uso
dados_ts_teste_desafio13 = [
    {'timestamp': '2024-01-01 08:00', 'valor': 25},
    {'timestamp': '2024-01-01 09:00', 'valor': 27},
    {'timestamp': '2024-01-01 10:00', 'valor': 24},
    {'timestamp': '2024-01-01 11:00', 'valor': 30},
    {'timestamp': '2024-01-01 12:00', 'valor': 28},
]
print("--- Desafio 13: Análise de Séries Temporais ---")
print(f"Análise de série temporal: {analisar_series_temporais(dados_ts_teste_desafio13)}")

def recomendar_manutencao(dados_equipamentos):
    """
    Recomenda manutenção baseada em dados de equipamentos.

    Args:
        dados_equipamentos (list): Lista de dicionários com dados de equipamentos,
                                    e.g., {'id': 'EQP001', 'horas_operacao': 1500, 'temperatura_media': 75, 'vibra_nivel': 0.8}.

    Returns:
        list: Lista de recomendações de manutenção.
    """
    recomendacoes = []
    TEMPERATURA_LIMITE = 80
    HORAS_OPERACAO_LIMITE = 2000
    VIBRACAO_LIMITE = 1.0

    for eqp in dados_equipamentos:
        eqp_id = eqp.get('id', 'N/A')
        horas_op = eqp.get('horas_operacao', 0)
        temp_media = eqp.get('temperatura_media')
        vibra_nivel = eqp.get('vibra_nivel')

        if horas_op >= HORAS_OPERACAO_LIMITE:
            recomendacoes.append(f"Equipamento {eqp_id}: Manutenção preventiva recomendada (horas de operação excedidas: {horas_op}).")
        if temp_media is not None and temp_media > TEMPERATURA_LIMITE:
            recomendacoes.append(f"Equipamento {eqp_id}: Verificar superaquecimento (temperatura média: {temp_media}°C).")
        if vibra_nivel is not None and vibra_nivel > VIBRACAO_LIMITE:
            recomendacoes.append(f"Equipamento {eqp_id}: Verificar vibração excessiva (nível: {vibra_nivel}).")

    return recomendacoes

# Exemplo de uso
dados_eqp_teste_desafio14 = [
    {'id': 'EQP001', 'horas_operacao': 1800, 'temperatura_media': 70, 'vibra_nivel': 0.5},
    {'id': 'EQP002', 'horas_operacao': 2100, 'temperatura_media': 85, 'vibra_nivel': 1.2},
    {'id': 'EQP003', 'horas_operacao': 500, 'temperatura_media': 60, 'vibra_nivel': 0.3},
]
print("\n--- Desafio 14: Sistema de Recomendação de Manutenção ---")
print(f"Recomendações de manutenção: {recomendar_manutencao(dados_eqp_teste_desafio14)}")

def processar_dados_lote(dados, tamanho_lote=100):
    """
    Processa dados em lotes para análise eficiente.

    Args:
        dados (list): Lista de dados a serem processados.
        tamanho_lote (int): Tamanho de cada lote.

    Returns:
        list: Lista de resultados do processamento de cada lote.
    """
    resultados_lote = []
    for i in range(0, len(dados), tamanho_lote):
        lote = dados[i:i + tamanho_lote]
        if lote and isinstance(lote[0], (int, float)):
            resultado_lote = sum(lote)
        else:
            resultado_lote = len(lote)

        resultados_lote.append(f"Lote {i//tamanho_lote + 1} processado, resultado: {resultado_lote}")
    return resultados_lote

# Exemplo de uso
dados_lote_teste_desafio15 = list(range(500))
print("\n--- Desafio 15: Processamento de Dados em Lote ---")
print(f"Processamento em lote: {processar_dados_lote(dados_lote_teste_desafio15, tamanho_lote=75)}")

def processar_dados_lote(dados, tamanho_lote=100):
    """
    Processa dados em lotes para análise eficiente.

    Args:
        dados (list): Lista de dados a serem processados.
        tamanho_lote (int): Tamanho de cada lote.

    Returns:
        list: Lista de resultados do processamento de cada lote.
    """
    resultados_lote = []
    for i in range(0, len(dados), tamanho_lote):
        lote = dados[i:i + tamanho_lote]
        if lote and isinstance(lote[0], (int, float)):
            resultado_lote = sum(lote)
        else:
            resultado_lote = len(lote)

        resultados_lote.append(f"Lote {i//tamanho_lote + 1} processado, resultado: {resultado_lote}")
    return resultados_lote

# Exemplo de uso
dados_lote_teste_desafio15 = list(range(500))
print("\n--- Desafio 15: Processamento de Dados em Lote ---")
print(f"Processamento em lote: {processar_dados_lote(dados_lote_teste_desafio15, tamanho_lote=75)}")

def classificar_produtos(dados_produtos):
    """
    Classifica produtos baseado em atributos de qualidade.

    Args:
        dados_produtos (list): Lista de dicionários com atributos de produtos,
                               e.g., {'id': 'PROD001', 'peso': 100, 'cor': 'azul', 'defeito': False}.

    Returns:
        dict: Dicionário com produtos classificados por qualidade (Bom, Regular, Ruim).
    """
    classificacao = {"Bom": [], "Regular": [], "Ruim": []}

    for produto in dados_produtos:
        qualidade = "Bom"
        if produto.get('defeito', False):
            qualidade = "Ruim"
        elif produto.get('peso') is not None and not (90 <= produto['peso'] <= 110):
            qualidade = "Regular"

        classificacao[qualidade].append(produto.get('id', 'N/A'))

    return classificacao

# Exemplo de uso
dados_prod_teste_desafio16 = [
    {'id': 'PROD001', 'peso': 105, 'cor': 'azul', 'defeito': False},
    {'id': 'PROD002', 'peso': 85, 'cor': 'vermelho', 'defeito': False},
    {'id': 'PROD003', 'peso': 112, 'cor': 'verde', 'defeito': False},
    {'id': 'PROD004', 'peso': 95, 'cor': 'amarelo', 'defeito': True},
]
print("\n--- Desafio 16: Classificação de produtos por qualidade ---")
print(f"Classificação de produtos: {classificar_produtos(dados_prod_teste_desafio16)}")

import collections

def analisar_correlacoes(dados_producao):
    """
    Analisa correlações entre variáveis de produção.
    Simplificado para identificar variáveis que aparecem juntas ou em padrões.

    Args:
        dados_producao (list): Lista de dicionários, cada um representando um registro de produção.

    Returns:
        dict: Análise de correlações (neste exemplo, contagem de co-ocorrências).
    """
    co_ocorrencias = collections.defaultdict(lambda: collections.defaultdict(int))

    chaves = []
    if dados_producao:
        chaves = list(dados_producao[0].keys())

    for i in range(len(chaves)):
        for j in range(i + 1, len(chaves)):
            var1 = chaves[i]
            var2 = chaves[j]
            for registro in dados_producao:
                if var1 in registro and var2 in registro:
                    co_ocorrencias[var1][var2] += 1
                    co_ocorrencias[var2][var1] += 1

    return dict(co_ocorrencias)

# Exemplo de uso
dados_corr_teste_desafio17 = [
    {'temp': 25, 'pressao': 1000, 'status': 'ok'},
    {'temp': 30, 'pressao': 1010, 'status': 'ok'},
    {'temp': 35, 'pressao': 1020, 'status': 'alerta'},
    {'temp': 26, 'pressao': 1005, 'status': 'ok'},
    {'temp': 38, 'pressao': 1025, 'status': 'alerta'},
]
print("\n--- Desafio 17: Análise de correlações entre variáveis ---")
print(f"Análise de correlações (co-ocorrências): {analisar_correlacoes(dados_corr_teste_desafio17)}")

def prever_demanda(historico_demanda):
    """
    Prevê demanda futura baseada em histórico.
    Implementação simples: média dos últimos valores.

    Args:
        historico_demanda (list): Lista de valores numéricos de demanda passada.

    Returns:
        float: Previsão de demanda futura.
    """
    if not historico_demanda:
        return 0.0

    periodos_para_media = min(3, len(historico_demanda))
    previsao = sum(historico_demanda[-periodos_para_media:]) / periodos_para_media

    return previsao

def otimizar_recursos(dados_linhas, recursos_disponiveis):
    """
    Otimiza alocação de recursos em linhas de produção.
    Exemplo simplificado: aloca recursos para a linha com menor eficiência primeiro.

    Args:
        dados_linhas (list): Lista de dicionários, e.g., [{'id': 'LINHA_A', 'eficiencia': 0.8}, ...].
        recursos_disponiveis (int): Número total de unidades de recursos a serem alocadas.

    Returns:
        dict: Alocação de recursos por linha.
    """
    if not dados_linhas or recursos_disponiveis <= 0:
        return {"mensagem": "Dados ou recursos insuficientes para otimização."}

    linhas_ordenadas = sorted(dados_linhas, key=lambda x: x.get('eficiencia', 1.0))

    alocacao_recursos = {linha['id']: 0 for linha in dados_linhas}
    recursos_alocados = 0

    for linha in linhas_ordenadas:
        if recursos_alocados < recursos_disponiveis:
            alocacao_recursos[linha['id']] += 1
            recursos_alocados += 1

    return alocacao_recursos

# Exemplo de uso
linhas_recursos_teste_desafio19 = [
    {'id': 'LINHA_A', 'eficiencia': 0.9},
    {'id': 'LINHA_B', 'eficiencia': 0.7},
    {'id': 'LINHA_C', 'eficiencia': 0.8},
]
recursos_disponiveis_teste_desafio19 = 3
print("\n--- Desafio 19: Otimização de recursos ---")
print(f"Alocação de recursos: {otimizar_recursos(linhas_recursos_teste_desafio19, recursos_disponiveis_teste_desafio19)}")

def analisar_feedback(feedbacks):
    """
    Analisa textos de feedback sobre qualidade.
    Exemplo simplificado: contagem de palavras-chave positivas e negativas.

    Args:
        feedbacks (list): Lista de strings de feedback.

    Returns:
        dict: Análise de sentimento (contagem de termos).
    """
    palavras_positivas = ['bom', 'ótimo', 'excelente', 'satisfeito', 'rápido', 'eficiente']
    palavras_negativas = ['ruim', 'lento', 'defeito', 'problema', 'insatisfeito', 'quebrou']

    analise_sentimento = {'positivo': 0, 'negativo': 0, 'neutro': 0}

    for feedback in feedbacks:
        feedback_lower = feedback.lower()
        positivo = any(p in feedback_lower for p in palavras_positivas)
        negativo = any(n in feedback_lower for n in palavras_negativas)

        if positivo and not negativo:
            analise_sentimento['positivo'] += 1
        elif negativo and not positivo:
            analise_sentimento['negativo'] += 1
        else:
            analise_sentimento['neutro'] += 1

    return analise_sentimento

# Exemplo de uso
feedbacks_teste_desafio20 = [
    "Produto excelente, muito bom!",
    "Serviço lento e com problema.",
    "Entrega rápida e eficiente.",
    "O produto veio com um defeito.",
    "Achei o produto ok, nada de mais.",
]
print("\n--- Desafio 20: Análise de texto de feedback ---")
print(f"Análise de feedback: {analisar_feedback(feedbacks_teste_desafio20)}")


## DESAFIOS AVANÇADOS (21-30)

### Desafio 21: Sistema de Detecção de Anomalias em Tempo Real
Implemente um sistema que detecta anomalias em fluxos de dados.

### Desafio 22: Simulador de Linha de Produção
Simule o funcionamento de uma linha de produção com eventos aleatórios.

### Desafio 23: Algoritmo de Roteamento de Veículos
Implemente algoritmo simples de roteamento para distribuição.

### Desafio 24: Sistema de Gestão de Inventário
Implemente sistema de gestão de inventário com regras de reabastecimento.

### Desafio 25: Algoritmo de Agendamento de Tarefas
Implemente algoritmo de agendamento de tarefas em máquinas.

### Desafios 26-30: Sistemas Complexos
- **Desafio 26:** Sistema de controle de qualidade automatizado
- **Desafio 27:** Modelo de simulação de cadeia de suprimentos
- **Desafio 28:** Sistema de recomendação para manutenção preditiva
- **Desafio 29:** Algoritmo de balanceamento de linha de produção
- **Desafio 30:** Sistema integrado de monitoramento industrial


In [4]:


import collections
import random
import statistics
import time
from datetime import datetime
import math

# DESAFIO 21: Sistema de Detecção de Anomalias em Tempo Real
def detectar_anomalias_tempo_real(fluxo_dados, janela_tamanho=10, desvio_padrao_limite=2):
    """
    Detecta anomalias em fluxo contínuo de dados baseado em desvio padrão.

    Args:
        fluxo_dados (generator): Gerador de dados em tempo real (produz dicionários com 'timestamp' e 'valor').
        janela_tamanho (int): Tamanho da janela móvel para cálculo da média e desvio padrão.
        desvio_padrao_limite (int): Multiplicador do desvio padrão para definir o limite de anomalia.

    Returns:
        generator: Gerador de alertas de anomalias (dicionários com 'timestamp', 'valor', 'alerta').
    """
    janela_valores = collections.deque(maxlen=janela_tamanho)

    for dado in fluxo_dados:
        timestamp, valor = dado.get('timestamp'), dado.get('valor')

        if valor is None:
            continue # Ignora dados sem valor

        janela_valores.append(valor)

        if len(janela_valores) == janela_tamanho:
            media = statistics.mean(janela_valores)
            # stdev precisa de pelo menos 2 pontos para ser calculado
            stdev = statistics.stdev(janela_valores) if len(janela_valores) > 1 else 0

            if stdev > 0:
                limite_superior = media + desvio_padrao_limite * stdev
                limite_inferior = media - desvio_padrao_limite * stdev

                if not (limite_inferior <= valor <= limite_superior):
                    yield {'timestamp': timestamp, 'valor': valor, 'alerta': 'Anomalia detectada: valor fora do desvio padrão esperado.'}

        time.sleep(0.05) # Pequeno atraso para simular o fluxo em tempo real

# Gerador de dados de teste para simular fluxo em tempo real
def gerador_dados_sensores(num_dados=50):
    base_value = 20.0
    for i in range(num_dados):
        value = base_value + random.gauss(0, 2)

        # Introduce anomalias aleatórias
        if random.random() < 0.08 and i > 5: # 8% de chance de anomalia, após alguns dados normais
            value += random.choice([-1, 1]) * random.uniform(8, 15) # Grande desvio

        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        yield {'timestamp': timestamp, 'valor': value}

print("--- DESAFIO 21: Sistema de Detecção de Anomalias em Tempo Real ---")
dados_simulados_21 = gerador_dados_sensores(num_dados=30)
alertas_anomalias_21 = detectar_anomalias_tempo_real(dados_simulados_21, janela_tamanho=5, desvio_padrao_limite=1.8)

for alerta in alertas_anomalias_21:
    print(alerta)

# DESAFIO 22: Simulador de Linha de Produção
def simular_linha_producao(tempo_simulacao_minutos):
    """
    Simula linha de produção com eventos aleatórios.

    Args:
        tempo_simulacao_minutos (int): Tempo total de simulação em minutos.

    Returns:
        dict: Estatísticas da simulação.
    """
    estacoes = {
        'Corte': {'tempo_base': 10, 'eficiencia': 0.95, 'falha_chance': 0.02, 'tempo_reparo': 20, 'unidades_processadas': 0, 'tempo_ocioso': 0, 'em_falha': False, 'tempo_restante_reparo': 0},
        'Dobra': {'tempo_base': 15, 'eficiencia': 0.90, 'falha_chance': 0.03, 'tempo_reparo': 30, 'unidades_processadas': 0, 'tempo_ocioso': 0, 'em_falha': False, 'tempo_restante_reparo': 0},
        'Solda': {'tempo_base': 20, 'eficiencia': 0.92, 'falha_chance': 0.01, 'tempo_reparo': 40, 'unidades_processadas': 0, 'tempo_ocioso': 0, 'em_falha': False, 'tempo_restante_reparo': 0},
    }

    tempo_atual = 0
    unidades_concluidas_total = 0

    print(f"\n--- DESAFIO 22: Simulador de Linha de Produção ---")
    print(f"Iniciando simulação da linha de produção por {tempo_simulacao_minutos} minutos...")

    while tempo_atual < tempo_simulacao_minutos:
        # Avança o tempo em incrementos de 1 minuto
        delta_tempo = 1

        for nome_estacao, dados_estacao in estacoes.items():
            if dados_estacao['em_falha']:
                dados_estacao['tempo_restante_reparo'] -= delta_tempo
                dados_estacao['tempo_ocioso'] += delta_tempo
                if dados_estacao['tempo_restante_reparo'] <= 0:
                    dados_estacao['em_falha'] = False
                    print(f"[{tempo_atual} min] Estação '{nome_estacao}' reparada.")
            else:
                # Chance de falha
                if random.random() < dados_estacao['falha_chance']:
                    dados_estacao['em_falha'] = True
                    dados_estacao['tempo_restante_reparo'] = dados_estacao['tempo_reparo']
                    print(f"[{tempo_atual} min] Estação '{nome_estacao}' falhou! Reparando por {dados_estacao['tempo_reparo']} min.")
                    dados_estacao['tempo_ocioso'] += delta_tempo # O minuto atual também é de ociosidade
                else:
                    # Processa uma unidade
                    dados_estacao['unidades_processadas'] += 1
                    # Se for a última estação e processou uma unidade, conta como concluída
                    if nome_estacao == list(estacoes.keys())[-1]:
                        unidades_concluidas_total += 1

        tempo_atual += delta_tempo
        # time.sleep(0.005) # Pequeno atraso para simulação visual

    estatisticas = {
        'tempo_simulacao_total_minutos': tempo_simulacao_minutos,
        'unidades_concluidas_total': unidades_concluidas_total,
        'estacoes': {}
    }

    for nome_estacao, dados_estacao in estacoes.items():
        estatisticas['estacoes'][nome_estacao] = {
            'unidades_processadas': dados_estacao['unidades_processadas'],
            'tempo_ocioso_minutos': dados_estacao['tempo_ocioso'],
            'status_final': 'Em falha' if dados_estacao['em_falha'] else 'Operacional'
        }
    return estatisticas

estatisticas_simulacao_22 = simular_linha_producao(tempo_simulacao_minutos=100)
print(f"Estatísticas da simulação: {estatisticas_simulacao_22}")

# DESAFIO 23: Algoritmo de Roteamento de Veículos
def rotear_veiculos(pedidos, veiculos):
    """
    Roteia veículos para entrega de pedidos usando um algoritmo guloso.

    Args:
        pedidos (list): Lista de dicionários com pedidos, e.g., {'id': 'P1', 'localizacao': (x, y)}.
        veiculos (list): Lista de dicionários com veículos, e.g., {'id': 'V1', 'capacidade': C, 'localizacao_atual': (x, y)}.

    Returns:
        dict: Plano de roteamento, onde as chaves são IDs dos veículos e os valores são listas de pedidos atribuídos.
    """
    plano_roteamento = {v['id']: [] for v in veiculos}
    pedidos_pendentes = list(pedidos) # Cria uma cópia para poder remover pedidos

    # Simples cálculo de distância euclidiana (pode ser substituído por distância real/tempo de viagem)
    def calcular_distancia(loc1, loc2):
        return ((loc1[0] - loc2[0])**2 + (loc1[1] - loc2[1])**2)**0.5

    print("\n--- DESAFIO 23: Algoritmo de Roteamento de Veículos ---")
    print(f"Pedidos iniciais: {pedidos_pendentes}")
    print(f"Veículos disponíveis: {veiculos}")

    while pedidos_pendentes:
        melhor_atribuicao = None
        menor_distancia = float('inf')

        for pedido in pedidos_pendentes:
            for veiculo in veiculos:
                if veiculo.get('capacidade', 0) > 0: # Verifica se o veículo tem capacidade
                    distancia = calcular_distancia(veiculo['localizacao_atual'], pedido['localizacao'])
                    if distancia < menor_distancia:
                        menor_distancia = distancia
                        melhor_atribuicao = {'pedido': pedido, 'veiculo': veiculo}

        if melhor_atribuicao:
            pedido_atribuido = melhor_atribuicao['pedido']
            veiculo_atribuido = melhor_atribuicao['veiculo']

            plano_roteamento[veiculo_atribuido['id']].append(pedido_atribuido['id'])
            veiculo_atribuido['localizacao_atual'] = pedido_atribuido['localizacao']
            veiculo_atribuido['capacidade'] -= 1 # Decrementa a capacidade
            pedidos_pendentes.remove(pedido_atribuido)
            print(f"Atribuído Pedido {pedido_atribuido['id']} para Veículo {veiculo_atribuido['id']}. Pedidos restantes: {len(pedidos_pendentes)}")
        else:
            print("Não foi possível atribuir mais pedidos. Veículos sem capacidade ou sem pedidos restantes.")
            break # Não há mais atribuições possíveis

    return plano_roteamento

# Dados de teste
pedidos_23 = [
    {'id': 'P1', 'localizacao': (10, 20)},
    {'id': 'P2', 'localizacao': (50, 10)},
    {'id': 'P3', 'localizacao': (30, 40)},
    {'id': 'P4', 'localizacao': (70, 30)},
    {'id': 'P5', 'localizacao': (20, 5)},
]
veiculos_23 = [
    {'id': 'V1', 'capacidade': 2, 'localizacao_atual': (0, 0)},
    {'id': 'V2', 'capacidade': 3, 'localizacao_atual': (0, 0)},
]

plano_23 = rotear_veiculos(pedidos_23, veiculos_23)
print(f"Plano de roteamento final: {plano_23}")

# DESAFIO 24: Sistema de Gestão de Inventário
def gerenciar_inventario(historico_demandas, estoque_atual, estoque_minimo, estoque_maximo):
    """
    Gerencia inventário com regras de reabastecimento.

    Args:
        historico_demandas (dict): Dicionário com histórico de demandas por item, e.g., {'itemA': [10, 12, 8]}.
        estoque_atual (dict): Estoque atual de itens, e.g., {'itemA': 50, 'itemB': 20}.
        estoque_minimo (dict): Nível de estoque mínimo para cada item.
        estoque_maximo (dict): Nível de estoque máximo para cada item (alvo de reabastecimento).

    Returns:
        dict: Pedidos de reabastecimento, e.g., {'itemA': 30}.
    """
    pedidos_reabastecimento = {}

    print("\n--- DESAFIO 24: Sistema de Gestão de Inventário ---")
    print(f"Estoque atual: {estoque_atual}")
    print(f"Histórico de demandas: {historico_demandas}")

    for item, estoque_qtd in estoque_atual.items():
        min_qty = estoque_minimo.get(item, 0)
        max_qty = estoque_maximo.get(item, 0)

        # Regra de reabastecimento: se o estoque atual estiver abaixo do mínimo
        if estoque_qtd < min_qty:
            demanda_media = sum(historico_demandas.get(item, [0])) / len(historico_demandas.get(item, [1]))
            # Calcula a quantidade a pedir para atingir o estoque máximo ou um pouco mais que a demanda média
            quantidade_pedir = max(max_qty - estoque_qtd, int(demanda_media * 1.5)) # Pede o que falta para o maximo ou 1.5x a demanda média

            if quantidade_pedir > 0:
                pedidos_reabastecimento[item] = quantidade_pedir
                print(f"Item {item}: Estoque ({estoque_qtd}) abaixo do mínimo ({min_qty}). Pedido de reabastecimento: {quantidade_pedir} unidades.")
        else:
            print(f"Item {item}: Estoque ({estoque_qtd}) dentro dos limites. Nenhum pedido necessário.")

    return pedidos_reabastecimento

# Dados de teste
historico_demandas_24 = {
    'Parafuso_M8': [100, 120, 90, 110],
    'Placa_Eletronica': [5, 7, 6],
    'Cabo_HDMI': [20, 25, 18, 22],
}
estoque_atual_24 = {
    'Parafuso_M8': 80,
    'Placa_Eletronica': 4,
    'Cabo_HDMI': 30,
}
estoque_minimo_24 = {
    'Parafuso_M8': 100,
    'Placa_Eletronica': 5,
    'Cabo_HDMI': 20,
}
estoque_maximo_24 = {
    'Parafuso_M8': 200,
    'Placa_Eletronica': 15,
    'Cabo_HDMI': 50,
}

pedidos_24 = gerenciar_inventario(historico_demandas_24, estoque_atual_24, estoque_minimo_24, estoque_maximo_24)
print(f"Pedidos de reabastecimento: {pedidos_24}")

# DESAFIO 25: Algoritmo de Agendamento de Tarefas
def agendar_tarefas(tarefas, maquinas):
    """
    Agenda tarefas em máquinas para minimizar o tempo total.
    Usa um algoritmo guloso que atribui a tarefa à máquina com menor carga atual.

    Args:
        tarefas (list): Lista de dicionários com tarefas, e.g., [{'id': 'T1', 'duracao': 10}, ...].
        maquinas (list): Lista de dicionários com máquinas, e.g., [{'id': 'M1', 'carga_atual': 0}, ...].

    Returns:
        dict: Cronograma de tarefas, onde as chaves são IDs das máquinas e os valores são listas de IDs das tarefas atribuídas.
    """
    cronograma = {m['id']: [] for m in maquinas}

    # Inicializa a carga atual para cada máquina
    for maquina in maquinas:
        maquina['carga_atual'] = 0

    # Ordena as tarefas por duração (opcional, pode ser útil para algumas estratégias)
    # tarefas_ordenadas = sorted(tarefas, key=lambda x: x['duracao'], reverse=True) # Longest Processing Time (LPT)
    tarefas_ordenadas = list(tarefas) # Para este guloso simples, a ordem de entrada serve

    print("\n--- DESAFIO 25: Algoritmo de Agendamento de Tarefas ---")
    print(f"Tarefas: {tarefas_ordenadas}")
    print(f"Máquinas: {maquinas}")

    for tarefa in tarefas_ordenadas:
        melhor_maquina = None
        menor_carga = float('inf')

        for maquina in maquinas:
            if maquina['carga_atual'] < menor_carga:
                menor_carga = maquina['carga_atual']
                melhor_maquina = maquina

        if melhor_maquina:
            cronograma[melhor_maquina['id']].append(tarefa['id'])
            melhor_maquina['carga_atual'] += tarefa['duracao']
            print(f"Tarefa {tarefa['id']} (Duração: {tarefa['duracao']}) atribuída à Máquina {melhor_maquina['id']}. Carga atual: {melhor_maquina['carga_atual']}")
        else:
            print(f"Erro: Nenhuma máquina disponível para a tarefa {tarefa['id']}.")

    return cronograma

# Dados de teste
tarefas_25 = [
    {'id': 'T1', 'duracao': 10},
    {'id': 'T2', 'duracao': 5},
    {'id': 'T3', 'duracao': 12},
    {'id': 'T4', 'duracao': 7},
    {'id': 'T5', 'duracao': 8},
]
maquinas_25 = [
    {'id': 'M1', 'carga_atual': 0},
    {'id': 'M2', 'carga_atual': 0},
    {'id': 'M3', 'carga_atual': 0},
]

cronograma_25 = agendar_tarefas(tarefas_25, maquinas_25)
print(f"Cronograma final: {cronograma_25}")

# DESAFIO 26: Sistema de Controle de Qualidade Automatizado
def controlar_qualidade(dados_inspecao):
    """
    Toma decisões de qualidade baseado em regras.

    Args:
        dados_inspecao (dict): Dados de inspeção do produto, e.g.,
                               {'defeitos': 0, 'dimensao_x': 10.1, 'dimensao_y': 20.0, 'acabamento': 'perfeito'}.

    Returns:
        str: Decisão (APROVADO, REPROVADO, RETRABALHO).
    """
    defeitos = dados_inspecao.get('defeitos', 0)
    dimensao_x = dados_inspecao.get('dimensao_x')
    dimensao_y = dados_inspecao.get('dimensao_y')
    acabamento = dados_inspecao.get('acabamento', 'bom')
    resistencia = dados_inspecao.get('resistencia') # Exemplo de novo atributo

    print("\n--- DESAFIO 26: Sistema de Controle de Qualidade Automatizado ---")
    print(f"Dados de inspeção: {dados_inspecao}")

    # Regras de decisão de qualidade
    if defeitos > 2:
        return "REPROVADO"

    if defeitos == 1:
        if acabamento != 'perfeito':
            return "RETRABALHO"
        elif resistencia is not None and resistencia < 70:
            return "RETRABALHO"
        else:
            return "APROVADO" # 1 defeito leve e outros ok

    if dimensao_x is not None and not (9.9 <= dimensao_x <= 10.1):
        if 9.8 <= dimensao_x < 9.9 or 10.1 < dimensao_x <= 10.2:
            return "RETRABALHO"
        else:
            return "REPROVADO"

    if dimensao_y is not None and not (19.9 <= dimensao_y <= 20.1):
        if 19.8 <= dimensao_y < 19.9 or 20.1 < dimensao_y <= 20.2:
            return "RETRABALHO"
        else:
            return "REPROVADO"

    if acabamento == 'ruim':
        return "REPROVADO"
    elif acabamento == 'aceitavel':
        return "RETRABALHO"

    if resistencia is not None and resistencia < 50:
        return "REPROVADO"

    return "APROVADO"

# Dados de teste
dados_inspecao_26_aprovado = {'defeitos': 0, 'dimensao_x': 10.0, 'dimensao_y': 20.0, 'acabamento': 'perfeito', 'resistencia': 90}
dados_inspecao_26_reprovado = {'defeitos': 3, 'dimensao_x': 10.0, 'dimensao_y': 20.0, 'acabamento': 'perfeito', 'resistencia': 80}
dados_inspecao_26_retrabalho = {'defeitos': 1, 'dimensao_x': 10.15, 'dimensao_y': 20.0, 'acabamento': 'bom', 'resistencia': 65}
dados_inspecao_26_dim_fora_retrabalho = {'defeitos': 0, 'dimensao_x': 10.15, 'dimensao_y': 20.0, 'acabamento': 'perfeito', 'resistencia': 90}
dados_inspecao_26_dim_fora_reprovado = {'defeitos': 0, 'dimensao_x': 10.5, 'dimensao_y': 20.0, 'acabamento': 'perfeito', 'resistencia': 90}


print(f"Decisão para Produto Aprovado: {controlar_qualidade(dados_inspecao_26_aprovado)}")
print(f"Decisão para Produto Reprovado (3 defeitos): {controlar_qualidade(dados_inspecao_26_reprovado)}")
print(f"Decisão para Produto Retrabalho (1 defeito e resistência baixa): {controlar_qualidade(dados_inspecao_26_retrabalho)}")
print(f"Decisão para Produto Retrabalho (dimensão x fora do limite aceitável): {controlar_qualidade(dados_inspecao_26_dim_fora_retrabalho)}")
print(f"Decisão para Produto Reprovado (dimensão x muito fora do limite): {controlar_qualidade(dados_inspecao_26_dim_fora_reprovado)}")

# DESAFIO 27: Modelo de Simulação de Cadeia de Suprimentos
def simular_cadeia_suprimentos(tempo_simulacao_dias):
    """
    Simula cadeia de suprimentos com múltiplos elos.

    Args:
        tempo_simulacao_dias (int): Tempo total de simulação em dias.

    Returns:
        dict: Estatísticas da simulação.
    """
    # Níveis de estoque iniciais
    estoque_materia_prima = 100
    estoque_produto_acabado = 50

    # Capacidades e tempos
    capacidade_producao_diaria = 10
    demanda_media_diaria = 8
    chance_demanda_extra = 0.1
    max_demanda_extra = 5
    tempo_entrega_fornecedor_min = 2 # dias
    tempo_entrega_fornecedor_max = 5 # dias
    custo_transporte_por_unidade = 1
    custo_producao_por_unidade = 5

    # Estatísticas
    total_pedidos_atendidos = 0
    total_pedidos_nao_atendidos = 0
    custo_total_suprimentos = 0
    dias_sem_materia_prima = 0
    dias_sem_produto_acabado = 0

    print(f"\n--- DESAFIO 27: Modelo de Simulação de Cadeia de Suprimentos ---")
    print(f"Iniciando simulação da cadeia de suprimentos por {tempo_simulacao_dias} dias...")

    for dia in range(1, tempo_simulacao_dias + 1):
        print(f"\n--- Dia {dia} ---")

        # 1. Chegada de matéria-prima (se houver pedido em trânsito - simplificado aqui)
        # Para uma simulação mais robusta, seria necessário gerenciar pedidos de MP
        # e seus prazos de entrega. Vamos simular uma chegada ocasional.
        if random.random() < 0.2: # 20% de chance de chegada de MP
            chegada_mp = random.randint(20, 50)
            estoque_materia_prima += chegada_mp
            custo_total_suprimentos += chegada_mp * 3 # Custo fictício da MP
            print(f"Chegaram {chegada_mp} unidades de matéria-prima. Estoque MP: {estoque_materia_prima}")

        # 2. Produção na fábrica
        if estoque_materia_prima >= 1: # Assumindo 1 MP por produto
            producao_possivel = min(capacidade_producao_diaria, estoque_materia_prima)
            estoque_produto_acabado += producao_possivel
            estoque_materia_prima -= producao_possivel
            custo_total_suprimentos += producao_possivel * custo_producao_por_unidade
            print(f"Produzidas {producao_possivel} unidades. Estoque PA: {estoque_produto_acabado}, Estoque MP: {estoque_materia_prima}")
        else:
            dias_sem_materia_prima += 1
            print("Fábrica parada por falta de matéria-prima.")

        # 3. Demanda do mercado
        demanda_dia = demanda_media_diaria
        if random.random() < chance_demanda_extra:
            demanda_dia += random.randint(1, max_demanda_extra)

        print(f"Demanda do dia: {demanda_dia} unidades.")

        if estoque_produto_acabado >= demanda_dia:
            estoque_produto_acabado -= demanda_dia
            total_pedidos_atendidos += demanda_dia
            custo_total_suprimentos += demanda_dia * custo_transporte_por_unidade # Custo de distribuição
            print(f"Demanda atendida. Estoque PA: {estoque_produto_acabado}")
        else:
            total_pedidos_nao_atendidos += (demanda_dia - estoque_produto_acabado)
            total_pedidos_atendidos += estoque_produto_acabado # Atende o que tem
            estoque_produto_acabado = 0
            dias_sem_produto_acabado += 1
            print(f"Demanda parcialmente/não atendida. Estoque PA: {estoque_produto_acabado}. Total não atendido: {total_pedidos_nao_atendidos}")

        # time.sleep(0.05) # Pequeno atraso para simulação

    estatisticas = {
        'tempo_simulacao_dias': tempo_simulacao_dias,
        'total_pedidos_atendidos': total_pedidos_atendidos,
        'total_pedidos_nao_atendidos': total_pedidos_nao_atendidos,
        'custo_total_suprimentos': custo_total_suprimentos,
        'estoque_final_materia_prima': estoque_materia_prima,
        'estoque_final_produto_acabado': estoque_produto_acabado,
        'dias_sem_materia_prima': dias_sem_materia_prima,
        'dias_sem_produto_acabado': dias_sem_produto_acabado,
    }
    return estatisticas

estatisticas_cadeia_27 = simular_cadeia_suprimentos(tempo_simulacao_dias=30)
print(f"\nEstatísticas finais da simulação da cadeia de suprimentos: {estatisticas_cadeia_27}")

# DESAFIO 28: Sistema de Recomendação para Manutenção Preditiva
def recomendar_manutencao_preditiva(dados_equipamento):
    """
    Recomenda manutenção baseada em padrões de falha.

    Args:
        dados_equipamento (dict): Dados históricos do equipamento, e.g.,
                                  {'id': 'EQP001', 'horas_operacao': [100, 200, 300], 'temperatura': [70, 75, 80],
                                   'falhas_registradas': [{'tipo': 'superaquecimento', 'horas': 250}]}.

    Returns:
        dict: Recomendações de manutenção.
    """
    recomendacoes = {}
    eqp_id = dados_equipamento.get('id', 'N/A')

    horas_operacao = dados_equipamento.get('horas_operacao', [])
    temperatura_historico = dados_equipamento.get('temperatura', [])
    falhas_registradas = dados_equipamento.get('falhas_registradas', [])

    print("\n--- DESAFIO 28: Sistema de Recomendação para Manutenção Preditiva ---")
    print(f"Analisando equipamento: {eqp_id}")

    # 1. Regra baseada em horas de operação (ex: a cada 500 horas)
    if horas_operacao:
        ultima_hora = horas_operacao[-1]
        proxima_manutencao_sugerida = (int(ultima_hora / 500) + 1) * 500
        recomendacoes['horas_operacao'] = f"Próxima manutenção sugerida em {proxima_manutencao_sugerida} horas de operação."

    # 2. Regra baseada em tendência de temperatura (se estiver aumentando consistentemente)
    if len(temperatura_historico) >= 3:
        if temperatura_historico[-1] > temperatura_historico[-2] and \
           temperatura_historico[-2] > temperatura_historico[-3] and \
           temperatura_historico[-1] > 85: # Limite para alerta
            recomendacoes['temperatura'] = "Tendência de aumento de temperatura observada, com valores altos. Verificar sistema de refrigeração."

    # 3. Regra baseada em frequência de falhas específicas
    falhas_por_tipo = collections.Counter(f['tipo'] for f in falhas_registradas)
    for tipo_falha, contagem in falhas_por_tipo.items():
        if tipo_falha == 'superaquecimento' and contagem >= 2:
            recomendacoes['falhas_superaquecimento'] = "Múltiplos registros de superaquecimento. Inspeção imediata do sistema térmico recomendada."
        if tipo_falha == 'vibracao_anormal' and contagem >= 3:
            recomendacoes['falhas_vibracao'] = "Alta frequência de vibrações anormais. Verificar rolamentos e balanceamento."

    if not recomendacoes:
        recomendacoes['geral'] = "Nenhum padrão de falha crítico detectado no momento. Operação normal."

    return recomendacoes

# Dados de teste
dados_equipamento_28_normal = {
    'id': 'EQP001',
    'horas_operacao': [100, 200, 300, 450],
    'temperatura': [70, 71, 72, 73],
    'falhas_registradas': [],
}
dados_equipamento_28_alerta_temp = {
    'id': 'EQP002',
    'horas_operacao': [500, 600, 700, 750],
    'temperatura': [80, 83, 86, 88], # Tendência de alta e acima do limite
    'falhas_registradas': [{'tipo': 'pequeno_superaquecimento', 'horas': 720}],
}
dados_equipamento_28_alerta_falha = {
    'id': 'EQP003',
    'horas_operacao': [1000, 1100, 1200, 1250],
    'temperatura': [70, 72, 71, 73],
    'falhas_registradas': [
        {'tipo': 'vibracao_anormal', 'horas': 1050},
        {'tipo': 'vibracao_anormal', 'horas': 1120},
        {'tipo': 'vibracao_anormal', 'horas': 1210},
    ],
}

print(f"Recomendações para EQP001: {recomendar_manutencao_preditiva(dados_equipamento_28_normal)}")
print(f"Recomendações para EQP002: {recomendar_manutencao_preditiva(dados_equipamento_28_alerta_temp)}")
print(f"Recomendações para EQP003: {recomendar_manutencao_preditiva(dados_equipamento_28_alerta_falha)}")

# DESAFIO 29: Algoritmo de Balanceamento de Linha de Produção
def balancear_linha_producao(tarefas, estacoes):
    """
    Balanceia carga de trabalho em linha de produção para minimizar a variação.
    Usa um algoritmo guloso: atribui a próxima tarefa à estação que menos processou.

    Args:
        tarefas (list): Lista de dicionários com tarefas, e.g., {'id': 'TarefaA', 'tempo': 10}.
        estacoes (list): Lista de dicionários com estações, e.g., {'id': 'Estacao1', 'carga_total': 0, 'tarefas': []}.

    Returns:
        dict: Distribuição balanceada, onde as chaves são IDs das estações e os valores são listas de IDs de tarefas.
    """
    # Inicializa as estações com carga zero e lista de tarefas vazia
    for estacao in estacoes:
        estacao['carga_total'] = 0
        estacao['tarefas'] = []

    # Ordena as tarefas por tempo, da mais longa para a mais curta (heurística LPT - Longest Processing Time)
    tarefas_ordenadas = sorted(tarefas, key=lambda t: t['tempo'], reverse=True)

    print("\n--- DESAFIO 29: Algoritmo de Balanceamento de Linha de Produção ---")
    print(f"Tarefas a balancear: {tarefas_ordenadas}")
    print(f"Estações disponíveis: {[e['id'] for e in estacoes]}")

    for tarefa in tarefas_ordenadas:
        # Encontra a estação com a menor carga atual
        estacao_menos_carregada = min(estacoes, key=lambda e: e['carga_total'])

        # Atribui a tarefa a essa estação
        estacao_menos_carregada['carga_total'] += tarefa['tempo']
        estacao_menos_carregada['tarefas'].append(tarefa['id'])
        print(f"Tarefa '{tarefa['id']}' (Tempo: {tarefa['tempo']}) atribuída à '{estacao_menos_carregada['id']}'. Carga da estação: {estacao_menos_carregada['carga_total']}.")

    # Formata o resultado
    distribuicao_balanceada = {e['id']: e['tarefas'] for e in estacoes}

    # Calcula a carga máxima e variação para análise
    cargas = [e['carga_total'] for e in estacoes]
    carga_maxima = max(cargas)
    carga_minima = min(cargas)
    media_carga = sum(cargas) / len(cargas)

    variancia = sum([(c - media_carga) ** 2 for c in cargas]) / len(cargas)
    desvio_padrao = math.sqrt(variancia)

    return {
        "distribuicao": distribuicao_balanceada,
        "analise": {
            "carga_maxima_estacao": carga_maxima,
            "carga_minima_estacao": carga_minima,
            "media_carga_estacao": f"{media_carga:.2f}",
            "desvio_padrao_carga": f"{desvio_padrao:.2f}",
            "indicador_balanceamento": f"{((carga_maxima - carga_minima) / media_carga * 100):.2f}%" if media_carga > 0 else "N/A"
        }
    }

# Dados de teste
tarefas_29 = [
    {'id': 'MontarMotor', 'tempo': 25},
    {'id': 'InstalarRodas', 'tempo': 15},
    {'id': 'VerificarFreios', 'tempo': 20},
    {'id': 'PintarCarroceria', 'tempo': 30},
    {'id': 'ChecarEletronica', 'tempo': 10},
    {'id': 'AcabamentoInterno', 'tempo': 18},
    {'id': 'TesteFinal', 'tempo': 12},
]
estacoes_29 = [
    {'id': 'Estacao_A'},
    {'id': 'Estacao_B'},
    {'id': 'Estacao_C'},
]

balanceamento_29 = balancear_linha_producao(tarefas_29, estacoes_29)
print(f"Balanceamento de linha de produção: {balanceamento_29}")

# DESAFIO 30: Sistema Integrado de Monitoramento Industrial
def sistema_monitoramento_industrial(dados_producao_diaria, dados_qualidade, dados_manutencao):
    """
    Sistema integrado de monitoramento industrial.

    Args:
        dados_producao_diaria (list): Lista de dicionários com produção diária,
                                      e.g., {'data': '2025-06-25', 'producao': 100, 'refugos': 5}.
        dados_qualidade (dict): Dados de qualidade, e.g., {'APROVADO': 95, 'REPROVADO': 3, 'RETRABALHO': 2}.
        dados_manutencao (list): Lista de dicionários com registros de manutenção,
                                   e.g., {'equipamento': 'EQP001', 'tipo': 'preventiva', 'custo': 500}.

    Returns:
        dict: Dashboard consolidado de métricas industriais.
    """
    dashboard = {
        'producao_total_periodo': 0,
        'total_refugos': 0,
        'produtividade_media_diaria': 0,
        'indice_qualidade_aprovados': 0,
        'custo_total_manutencao': 0,
        'manutencoes_por_tipo': collections.defaultdict(int),
        'alertas_recentes': [],
        'resumo_executivo': ""
    }

    print("\n--- DESAFIO 30: Sistema Integrado de Monitoramento Industrial ---")
    print("Gerando dashboard consolidado...")

    # 1. Análise de Produção
    if dados_producao_diaria:
        total_producao = sum(d['producao'] for d in dados_producao_diaria)
        total_refugos = sum(d.get('refugos', 0) for d in dados_producao_diaria)
        dias_com_producao = len(dados_producao_diaria)

        dashboard['producao_total_periodo'] = total_producao
        dashboard['total_refugos'] = total_refugos
        dashboard['produtividade_media_diaria'] = f"{total_producao / dias_com_producao:.2f}" if dias_com_producao > 0 else 0

    # 2. Análise de Qualidade
    if dados_qualidade:
        # Garante que as chaves estão em maiúsculas conforme esperado na função (APROVADO, REPROVADO, RETRABALHO)
        total_inspecionados = dados_qualidade.get('APROVADO', 0) + dados_qualidade.get('REPROVADO', 0) + dados_qualidade.get('RETRABALHO', 0)
        if total_inspecionados > 0:
            dashboard['indice_qualidade_aprovados'] = f"{(dados_qualidade.get('APROVADO', 0) / total_inspecionados * 100):.2f}%"
            if dados_qualidade.get('REPROVADO', 0) > 0:
                dashboard['alertas_recentes'].append(f"ALERTA DE QUALIDADE: {dados_qualidade['REPROVADO']} produtos reprovados.")
            if dados_qualidade.get('RETRABALHO', 0) > 0:
                dashboard['alertas_recentes'].append(f"AVISO DE QUALIDADE: {dados_qualidade['RETRABALHO']} produtos em retrabalho.")

    # 3. Análise de Manutenção
    if dados_manutencao:
        custo_total_manutencao = 0
        for manutencao in dados_manutencao:
            custo_total_manutencao += manutencao.get('custo', 0)
            dashboard['manutencoes_por_tipo'][manutencao.get('tipo', 'Outro')] += 1
            if manutencao.get('tipo') == 'corretiva':
                dashboard['alertas_recentes'].append(f"ALERTA DE MANUTENÇÃO: Manutenção corretiva em {manutencao.get('equipamento', 'equipamento não especificado')}.")
        dashboard['custo_total_manutencao'] = custo_total_manutencao

    # Resumo Executivo
    dashboard['resumo_executivo'] = (
        f"Relatório Consolidado de {datetime.now().strftime('%Y-%m-%d %H:%M')}:\n"
        f"- Produção total no período: {dashboard['producao_total_periodo']} unidades, com {dashboard['total_refugos']} refugos.\n"
        f"- Produtividade média diária: {dashboard['produtividade_media_diaria']} unidades.\n"
        f"- Índice de qualidade (aprovados): {dashboard['indice_qualidade_aprovados']}.\n"
        f"- Custo total de manutenção: R$ {dashboard['custo_total_manutencao']:.2f}.\n"
        f"- Tipos de manutenção realizadas: {dict(dashboard['manutencoes_por_tipo'])}.\n"
        f"- Alertas recentes: {'Nenhum' if not dashboard['alertas_recentes'] else '; '.join(dashboard['alertas_recentes'])}."
    )

    return dashboard

# Dados de teste fictícios para o Desafio 30
dados_producao_30 = [
    {'data': '2025-06-20', 'producao': 100, 'refugos': 5},
    {'data': '2025-06-21', 'producao': 110, 'refugos': 3},
    {'data': '2025-06-22', 'producao': 95, 'refugos': 7},
]
dados_qualidade_30 = {
    'APROVADO': 280,
    'REPROVADO': 10,
    'RETRABALHO': 5,
}
dados_manutencao_30 = [
    {'equipamento': 'EQP001', 'tipo': 'preventiva', 'custo': 300},
    {'equipamento': 'EQP002', 'tipo': 'corretiva', 'custo': 700},
    {'equipamento': 'EQP001', 'tipo': 'preventiva', 'custo': 250},
]

dashboard_30 = sistema_monitoramento_industrial(dados_producao_30, dados_qualidade_30, dados_manutencao_30)
print("\n--- Dashboard Consolidado ---")
print(dashboard_30['resumo_executivo'])
print("\nDetalhes do Dashboard:")
for key, value in dashboard_30.items():
    if key != 'resumo_executivo':
        print(f"- {key}: {value}")



--- DESAFIO 21: Sistema de Detecção de Anomalias em Tempo Real ---

--- DESAFIO 22: Simulador de Linha de Produção ---
Iniciando simulação da linha de produção por 100 minutos...
[11 min] Estação 'Solda' falhou! Reparando por 40 min.
[23 min] Estação 'Dobra' falhou! Reparando por 30 min.
[51 min] Estação 'Solda' reparada.
[53 min] Estação 'Dobra' reparada.
Estatísticas da simulação: {'tempo_simulacao_total_minutos': 100, 'unidades_concluidas_total': 59, 'estacoes': {'Corte': {'unidades_processadas': 100, 'tempo_ocioso_minutos': 0, 'status_final': 'Operacional'}, 'Dobra': {'unidades_processadas': 69, 'tempo_ocioso_minutos': 31, 'status_final': 'Operacional'}, 'Solda': {'unidades_processadas': 59, 'tempo_ocioso_minutos': 41, 'status_final': 'Operacional'}}}

--- DESAFIO 23: Algoritmo de Roteamento de Veículos ---
Pedidos iniciais: [{'id': 'P1', 'localizacao': (10, 20)}, {'id': 'P2', 'localizacao': (50, 10)}, {'id': 'P3', 'localizacao': (30, 40)}, {'id': 'P4', 'localizacao': (70, 30)}, {'

## CONCLUSÃO

### Parabéns! 🎉

Você completou todos os **30 desafios** de IA e Ciência de Dados na Indústria!

**O que você aprendeu:**
- ✅ Estruturas condicionais e match-case
- ✅ Laços de repetição (for e while)
- ✅ Estruturas de dados (listas, tuplas, dicionários)
- ✅ Compreensão de listas
- ✅ Leitura e escrita de arquivos
- ✅ Análise de dados sem bibliotecas externas
- ✅ Sistemas complexos de monitoramento
- ✅ Algoritmos de otimização
- ✅ Simulação de processos industriais

**Próximos passos:**
1. Implemente as soluções dos desafios
2. Teste com dados reais
3. Explore bibliotecas como pandas, numpy, matplotlib
4. Aplique os conceitos em projetos reais da indústria

**Agora você está preparado para aplicar Python em cenários reais da indústria!** 🚀


print("="*80)
print("FIM DOS 30 DESAFIOS DE IA E CIÊNCIA DE DADOS NA INDÚSTRIA")
print("="*80)
print()
print("🎯 RESUMO DO NOTEBOOK:")
print("   📊 Dados gerados: 4 arquivos (JSON e CSV)")
print("   📚 Conceitos revisados: Estruturas Python fundamentais")
print("   🏆 Desafios implementados: 30 cenários práticos industriais")
print()
print("💡 DICA PARA SUBMISSÃO:")
print("   - Complete os códigos marcados com '# SEU CÓDIGO AQUI'")
print("   - Teste todas as funções executando as células")
print("   - Verifique se os resultados fazem sentido")
print()
print("🚀 BOA SORTE NA SUA PROVA DISSERTATIVA!")
print("="*80)
